wip: migrate garage status and garage layout assign
This commit is contained in:
parent
145130481e
commit
69ddaafc60
8 changed files with 486 additions and 355 deletions
|
@ -1,10 +1,5 @@
|
|||
use std::collections::{HashMap, HashSet};
|
||||
use std::time::Duration;
|
||||
|
||||
use format_table::format_table;
|
||||
use garage_util::error::*;
|
||||
|
||||
use garage_rpc::layout::*;
|
||||
use garage_rpc::system::*;
|
||||
use garage_rpc::*;
|
||||
|
||||
|
@ -13,204 +8,6 @@ use garage_model::helper::error::Error as HelperError;
|
|||
use crate::admin::*;
|
||||
use crate::cli::*;
|
||||
|
||||
pub async fn cli_command_dispatch(
|
||||
cmd: Command,
|
||||
system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
|
||||
admin_rpc_endpoint: &Endpoint<AdminRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
) -> Result<(), HelperError> {
|
||||
match cmd {
|
||||
Command::Status => Ok(cmd_status(system_rpc_endpoint, rpc_host).await?),
|
||||
Command::Node(NodeOperation::Connect(connect_opt)) => {
|
||||
Ok(cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await?)
|
||||
}
|
||||
Command::Layout(layout_opt) => {
|
||||
Ok(cli_layout_command_dispatch(layout_opt, system_rpc_endpoint, rpc_host).await?)
|
||||
}
|
||||
Command::Bucket(bo) => {
|
||||
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
|
||||
}
|
||||
Command::Key(ko) => {
|
||||
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await
|
||||
}
|
||||
Command::Repair(ro) => {
|
||||
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
|
||||
}
|
||||
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
|
||||
Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
|
||||
Command::Block(bo) => {
|
||||
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await
|
||||
}
|
||||
Command::Meta(mo) => {
|
||||
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
|
||||
let status = fetch_status(rpc_cli, rpc_host).await?;
|
||||
let layout = fetch_layout(rpc_cli, rpc_host).await?;
|
||||
|
||||
println!("==== HEALTHY NODES ====");
|
||||
let mut healthy_nodes =
|
||||
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
|
||||
for adv in status.iter().filter(|adv| adv.is_up) {
|
||||
let host = adv.status.hostname.as_deref().unwrap_or("?");
|
||||
let addr = match adv.addr {
|
||||
Some(addr) => addr.to_string(),
|
||||
None => "N/A".to_string(),
|
||||
};
|
||||
if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) {
|
||||
let data_avail = match &adv.status.data_disk_avail {
|
||||
_ if cfg.capacity.is_none() => "N/A".into(),
|
||||
Some((avail, total)) => {
|
||||
let pct = (*avail as f64) / (*total as f64) * 100.;
|
||||
let avail = bytesize::ByteSize::b(*avail);
|
||||
format!("{} ({:.1}%)", avail, pct)
|
||||
}
|
||||
None => "?".into(),
|
||||
};
|
||||
healthy_nodes.push(format!(
|
||||
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
|
||||
id = adv.id,
|
||||
host = host,
|
||||
addr = addr,
|
||||
tags = cfg.tags.join(","),
|
||||
zone = cfg.zone,
|
||||
capacity = cfg.capacity_string(),
|
||||
data_avail = data_avail,
|
||||
));
|
||||
} else {
|
||||
let prev_role = layout
|
||||
.versions
|
||||
.iter()
|
||||
.rev()
|
||||
.find_map(|x| match x.roles.get(&adv.id) {
|
||||
Some(NodeRoleV(Some(cfg))) => Some(cfg),
|
||||
_ => None,
|
||||
});
|
||||
if let Some(cfg) = prev_role {
|
||||
healthy_nodes.push(format!(
|
||||
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...",
|
||||
id = adv.id,
|
||||
host = host,
|
||||
addr = addr,
|
||||
tags = cfg.tags.join(","),
|
||||
zone = cfg.zone,
|
||||
));
|
||||
} else {
|
||||
let new_role = match layout.staging.get().roles.get(&adv.id) {
|
||||
Some(NodeRoleV(Some(_))) => "pending...",
|
||||
_ => "NO ROLE ASSIGNED",
|
||||
};
|
||||
healthy_nodes.push(format!(
|
||||
"{id:?}\t{h}\t{addr}\t\t\t{new_role}",
|
||||
id = adv.id,
|
||||
h = host,
|
||||
addr = addr,
|
||||
new_role = new_role,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
format_table(healthy_nodes);
|
||||
|
||||
// Determine which nodes are unhealthy and print that to stdout
|
||||
let status_map = status
|
||||
.iter()
|
||||
.map(|adv| (adv.id, adv))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let tf = timeago::Formatter::new();
|
||||
let mut drain_msg = false;
|
||||
let mut failed_nodes = vec!["ID\tHostname\tTags\tZone\tCapacity\tLast seen".to_string()];
|
||||
let mut listed = HashSet::new();
|
||||
for ver in layout.versions.iter().rev() {
|
||||
for (node, _, role) in ver.roles.items().iter() {
|
||||
let cfg = match role {
|
||||
NodeRoleV(Some(role)) if role.capacity.is_some() => role,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
if listed.contains(node) {
|
||||
continue;
|
||||
}
|
||||
listed.insert(*node);
|
||||
|
||||
let adv = status_map.get(node);
|
||||
if adv.map(|x| x.is_up).unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Node is in a layout version, is not a gateway node, and is not up:
|
||||
// it is in a failed state, add proper line to the output
|
||||
let (host, last_seen) = match adv {
|
||||
Some(adv) => (
|
||||
adv.status.hostname.as_deref().unwrap_or("?"),
|
||||
adv.last_seen_secs_ago
|
||||
.map(|s| tf.convert(Duration::from_secs(s)))
|
||||
.unwrap_or_else(|| "never seen".into()),
|
||||
),
|
||||
None => ("??", "never seen".into()),
|
||||
};
|
||||
let capacity = if ver.version == layout.current().version {
|
||||
cfg.capacity_string()
|
||||
} else {
|
||||
drain_msg = true;
|
||||
"draining metadata...".to_string()
|
||||
};
|
||||
failed_nodes.push(format!(
|
||||
"{id:?}\t{host}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
|
||||
id = node,
|
||||
host = host,
|
||||
tags = cfg.tags.join(","),
|
||||
zone = cfg.zone,
|
||||
capacity = capacity,
|
||||
last_seen = last_seen,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
if failed_nodes.len() > 1 {
|
||||
println!("\n==== FAILED NODES ====");
|
||||
format_table(failed_nodes);
|
||||
if drain_msg {
|
||||
println!();
|
||||
println!("Your cluster is expecting to drain data from nodes that are currently unavailable.");
|
||||
println!("If these nodes are definitely dead, please review the layout history with");
|
||||
println!(
|
||||
"`garage layout history` and use `garage layout skip-dead-nodes` to force progress."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if print_staging_role_changes(&layout) {
|
||||
println!();
|
||||
println!("Please use `garage layout show` to check the proposed new layout and apply it.");
|
||||
println!();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn cmd_connect(
|
||||
rpc_cli: &Endpoint<SystemRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
args: ConnectNodeOpt,
|
||||
) -> Result<(), Error> {
|
||||
match rpc_cli
|
||||
.call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL)
|
||||
.await??
|
||||
{
|
||||
SystemRpc::Ok => {
|
||||
println!("Success.");
|
||||
Ok(())
|
||||
}
|
||||
m => Err(Error::unexpected_rpc_message(m)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cmd_admin(
|
||||
rpc_cli: &Endpoint<AdminRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
|
|
|
@ -10,147 +10,6 @@ use garage_rpc::*;
|
|||
|
||||
use crate::cli::*;
|
||||
|
||||
pub async fn cli_layout_command_dispatch(
|
||||
cmd: LayoutOperation,
|
||||
system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
) -> Result<(), Error> {
|
||||
match cmd {
|
||||
LayoutOperation::Assign(assign_opt) => {
|
||||
cmd_assign_role(system_rpc_endpoint, rpc_host, assign_opt).await
|
||||
}
|
||||
LayoutOperation::Remove(remove_opt) => {
|
||||
cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await
|
||||
}
|
||||
LayoutOperation::Show => cmd_show_layout(system_rpc_endpoint, rpc_host).await,
|
||||
LayoutOperation::Apply(apply_opt) => {
|
||||
cmd_apply_layout(system_rpc_endpoint, rpc_host, apply_opt).await
|
||||
}
|
||||
LayoutOperation::Revert(revert_opt) => {
|
||||
cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await
|
||||
}
|
||||
LayoutOperation::Config(config_opt) => {
|
||||
cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await
|
||||
}
|
||||
LayoutOperation::History => cmd_layout_history(system_rpc_endpoint, rpc_host).await,
|
||||
LayoutOperation::SkipDeadNodes(assume_sync_opt) => {
|
||||
cmd_layout_skip_dead_nodes(system_rpc_endpoint, rpc_host, assume_sync_opt).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cmd_assign_role(
|
||||
rpc_cli: &Endpoint<SystemRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
args: AssignRoleOpt,
|
||||
) -> Result<(), Error> {
|
||||
let status = match rpc_cli
|
||||
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
|
||||
.await??
|
||||
{
|
||||
SystemRpc::ReturnKnownNodes(nodes) => nodes,
|
||||
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
|
||||
};
|
||||
|
||||
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
|
||||
let all_nodes = layout.get_all_nodes();
|
||||
|
||||
let added_nodes = args
|
||||
.node_ids
|
||||
.iter()
|
||||
.map(|node_id| {
|
||||
find_matching_node(
|
||||
status
|
||||
.iter()
|
||||
.map(|adv| adv.id)
|
||||
.chain(all_nodes.iter().cloned()),
|
||||
node_id,
|
||||
)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let mut roles = layout.current().roles.clone();
|
||||
roles.merge(&layout.staging.get().roles);
|
||||
|
||||
for replaced in args.replace.iter() {
|
||||
let replaced_node = find_matching_node(all_nodes.iter().cloned(), replaced)?;
|
||||
match roles.get(&replaced_node) {
|
||||
Some(NodeRoleV(Some(_))) => {
|
||||
layout
|
||||
.staging
|
||||
.get_mut()
|
||||
.roles
|
||||
.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
|
||||
}
|
||||
_ => {
|
||||
return Err(Error::Message(format!(
|
||||
"Cannot replace node {:?} as it is not currently in planned layout",
|
||||
replaced_node
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if args.capacity.is_some() && args.gateway {
|
||||
return Err(Error::Message(
|
||||
"-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
|
||||
}
|
||||
if args.capacity == Some(ByteSize::b(0)) {
|
||||
return Err(Error::Message("Invalid capacity value: 0".into()));
|
||||
}
|
||||
|
||||
for added_node in added_nodes {
|
||||
let new_entry = match roles.get(&added_node) {
|
||||
Some(NodeRoleV(Some(old))) => {
|
||||
let capacity = match args.capacity {
|
||||
Some(c) => Some(c.as_u64()),
|
||||
None if args.gateway => None,
|
||||
None => old.capacity,
|
||||
};
|
||||
let tags = if args.tags.is_empty() {
|
||||
old.tags.clone()
|
||||
} else {
|
||||
args.tags.clone()
|
||||
};
|
||||
NodeRole {
|
||||
zone: args.zone.clone().unwrap_or_else(|| old.zone.to_string()),
|
||||
capacity,
|
||||
tags,
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let capacity = match args.capacity {
|
||||
Some(c) => Some(c.as_u64()),
|
||||
None if args.gateway => None,
|
||||
None => return Err(Error::Message(
|
||||
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
|
||||
};
|
||||
NodeRole {
|
||||
zone: args
|
||||
.zone
|
||||
.clone()
|
||||
.ok_or("Please specify a zone with the -z flag")?,
|
||||
capacity,
|
||||
tags: args.tags.clone(),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
layout
|
||||
.staging
|
||||
.get_mut()
|
||||
.roles
|
||||
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
|
||||
}
|
||||
|
||||
send_layout(rpc_cli, rpc_host, layout).await?;
|
||||
|
||||
println!("Role changes are staged but not yet committed.");
|
||||
println!("Use `garage layout show` to view staged role changes,");
|
||||
println!("and `garage layout apply` to enact staged changes.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn cmd_remove_role(
|
||||
rpc_cli: &Endpoint<SystemRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
|
|
|
@ -8,6 +8,5 @@ pub(crate) mod convert_db;
|
|||
|
||||
pub(crate) use cmd::*;
|
||||
pub(crate) use init::*;
|
||||
pub(crate) use layout::*;
|
||||
pub(crate) use structs::*;
|
||||
pub(crate) use util::*;
|
||||
|
|
188
src/garage/cli_v2/cluster.rs
Normal file
188
src/garage/cli_v2/cluster.rs
Normal file
|
@ -0,0 +1,188 @@
|
|||
use format_table::format_table;
|
||||
|
||||
use garage_util::error::*;
|
||||
|
||||
use garage_api::admin::api::*;
|
||||
|
||||
use crate::cli::structs::*;
|
||||
use crate::cli_v2::util::*;
|
||||
use crate::cli_v2::*;
|
||||
|
||||
impl Cli {
|
||||
pub async fn cmd_status(&self) -> Result<(), Error> {
|
||||
let status = self.api_request(GetClusterStatusRequest).await?;
|
||||
let layout = self.api_request(GetClusterLayoutRequest).await?;
|
||||
// TODO: layout history
|
||||
|
||||
println!("==== HEALTHY NODES ====");
|
||||
let mut healthy_nodes =
|
||||
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
|
||||
for adv in status.nodes.iter().filter(|adv| adv.is_up) {
|
||||
let host = adv.hostname.as_deref().unwrap_or("?");
|
||||
let addr = match adv.addr {
|
||||
Some(addr) => addr.to_string(),
|
||||
None => "N/A".to_string(),
|
||||
};
|
||||
if let Some(cfg) = &adv.role {
|
||||
let data_avail = match &adv.data_partition {
|
||||
_ if cfg.capacity.is_none() => "N/A".into(),
|
||||
Some(FreeSpaceResp { available, total }) => {
|
||||
let pct = (*available as f64) / (*total as f64) * 100.;
|
||||
let avail_str = bytesize::ByteSize::b(*available);
|
||||
format!("{} ({:.1}%)", avail_str, pct)
|
||||
}
|
||||
None => "?".into(),
|
||||
};
|
||||
healthy_nodes.push(format!(
|
||||
"{id:.16}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
|
||||
id = adv.id,
|
||||
host = host,
|
||||
addr = addr,
|
||||
tags = cfg.tags.join(","),
|
||||
zone = cfg.zone,
|
||||
capacity = capacity_string(cfg.capacity),
|
||||
data_avail = data_avail,
|
||||
));
|
||||
} else {
|
||||
/*
|
||||
let prev_role = layout
|
||||
.versions
|
||||
.iter()
|
||||
.rev()
|
||||
.find_map(|x| match x.roles.get(&adv.id) {
|
||||
Some(NodeRoleV(Some(cfg))) => Some(cfg),
|
||||
_ => None,
|
||||
});
|
||||
*/
|
||||
let prev_role = Option::<NodeRoleResp>::None; //TODO
|
||||
if let Some(cfg) = prev_role {
|
||||
healthy_nodes.push(format!(
|
||||
"{id:.16}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...",
|
||||
id = adv.id,
|
||||
host = host,
|
||||
addr = addr,
|
||||
tags = cfg.tags.join(","),
|
||||
zone = cfg.zone,
|
||||
));
|
||||
} else {
|
||||
let new_role = match layout.staged_role_changes.iter().find(|x| x.id == adv.id)
|
||||
{
|
||||
Some(_) => "pending...",
|
||||
_ => "NO ROLE ASSIGNED",
|
||||
};
|
||||
healthy_nodes.push(format!(
|
||||
"{id:?}\t{h}\t{addr}\t\t\t{new_role}",
|
||||
id = adv.id,
|
||||
h = host,
|
||||
addr = addr,
|
||||
new_role = new_role,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
format_table(healthy_nodes);
|
||||
|
||||
// Determine which nodes are unhealthy and print that to stdout
|
||||
// TODO: do we need this, or can it be done in the GetClusterStatus handler?
|
||||
let status_map = status
|
||||
.nodes
|
||||
.iter()
|
||||
.map(|adv| (&adv.id, adv))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let tf = timeago::Formatter::new();
|
||||
let mut drain_msg = false;
|
||||
let mut failed_nodes = vec!["ID\tHostname\tTags\tZone\tCapacity\tLast seen".to_string()];
|
||||
let mut listed = HashSet::new();
|
||||
//for ver in layout.versions.iter().rev() {
|
||||
for ver in [&layout].iter() {
|
||||
for cfg in ver.roles.iter() {
|
||||
let node = &cfg.id;
|
||||
if listed.contains(node.as_str()) {
|
||||
continue;
|
||||
}
|
||||
listed.insert(node.as_str());
|
||||
|
||||
let adv = status_map.get(node);
|
||||
if adv.map(|x| x.is_up).unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Node is in a layout version, is not a gateway node, and is not up:
|
||||
// it is in a failed state, add proper line to the output
|
||||
let (host, last_seen) = match adv {
|
||||
Some(adv) => (
|
||||
adv.hostname.as_deref().unwrap_or("?"),
|
||||
adv.last_seen_secs_ago
|
||||
.map(|s| tf.convert(Duration::from_secs(s)))
|
||||
.unwrap_or_else(|| "never seen".into()),
|
||||
),
|
||||
None => ("??", "never seen".into()),
|
||||
};
|
||||
/*
|
||||
let capacity = if ver.version == layout.current().version {
|
||||
cfg.capacity_string()
|
||||
} else {
|
||||
drain_msg = true;
|
||||
"draining metadata...".to_string()
|
||||
};
|
||||
*/
|
||||
let capacity = capacity_string(cfg.capacity);
|
||||
|
||||
failed_nodes.push(format!(
|
||||
"{id:?}\t{host}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
|
||||
id = node,
|
||||
host = host,
|
||||
tags = cfg.tags.join(","),
|
||||
zone = cfg.zone,
|
||||
capacity = capacity,
|
||||
last_seen = last_seen,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
if failed_nodes.len() > 1 {
|
||||
println!("\n==== FAILED NODES ====");
|
||||
format_table(failed_nodes);
|
||||
if drain_msg {
|
||||
println!();
|
||||
println!("Your cluster is expecting to drain data from nodes that are currently unavailable.");
|
||||
println!(
|
||||
"If these nodes are definitely dead, please review the layout history with"
|
||||
);
|
||||
println!(
|
||||
"`garage layout history` and use `garage layout skip-dead-nodes` to force progress."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if print_staging_role_changes(&layout) {
|
||||
println!();
|
||||
println!(
|
||||
"Please use `garage layout show` to check the proposed new layout and apply it."
|
||||
);
|
||||
println!();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn cmd_connect(&self, opt: ConnectNodeOpt) -> Result<(), Error> {
|
||||
let res = self
|
||||
.api_request(ConnectClusterNodesRequest(vec![opt.node]))
|
||||
.await?;
|
||||
if res.0.len() != 1 {
|
||||
return Err(Error::Message(format!("unexpected response: {:?}", res)));
|
||||
}
|
||||
let res = res.0.into_iter().next().unwrap();
|
||||
if res.success {
|
||||
println!("Success.");
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::Message(format!(
|
||||
"Failure: {}",
|
||||
res.error.unwrap_or_default()
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
119
src/garage/cli_v2/layout.rs
Normal file
119
src/garage/cli_v2/layout.rs
Normal file
|
@ -0,0 +1,119 @@
|
|||
use bytesize::ByteSize;
|
||||
use format_table::format_table;
|
||||
|
||||
use garage_util::error::*;
|
||||
|
||||
use garage_api::admin::api::*;
|
||||
|
||||
use crate::cli::layout as cli_v1;
|
||||
use crate::cli::structs::*;
|
||||
use crate::cli_v2::util::*;
|
||||
use crate::cli_v2::*;
|
||||
|
||||
impl Cli {
|
||||
pub async fn layout_command_dispatch(&self, cmd: LayoutOperation) -> Result<(), Error> {
|
||||
match cmd {
|
||||
LayoutOperation::Assign(assign_opt) => self.cmd_assign_role(assign_opt).await,
|
||||
|
||||
// TODO
|
||||
LayoutOperation::Remove(remove_opt) => {
|
||||
cli_v1::cmd_remove_role(&self.system_rpc_endpoint, self.rpc_host, remove_opt).await
|
||||
}
|
||||
LayoutOperation::Show => {
|
||||
cli_v1::cmd_show_layout(&self.system_rpc_endpoint, self.rpc_host).await
|
||||
}
|
||||
LayoutOperation::Apply(apply_opt) => {
|
||||
cli_v1::cmd_apply_layout(&self.system_rpc_endpoint, self.rpc_host, apply_opt).await
|
||||
}
|
||||
LayoutOperation::Revert(revert_opt) => {
|
||||
cli_v1::cmd_revert_layout(&self.system_rpc_endpoint, self.rpc_host, revert_opt)
|
||||
.await
|
||||
}
|
||||
LayoutOperation::Config(config_opt) => {
|
||||
cli_v1::cmd_config_layout(&self.system_rpc_endpoint, self.rpc_host, config_opt)
|
||||
.await
|
||||
}
|
||||
LayoutOperation::History => {
|
||||
cli_v1::cmd_layout_history(&self.system_rpc_endpoint, self.rpc_host).await
|
||||
}
|
||||
LayoutOperation::SkipDeadNodes(assume_sync_opt) => {
|
||||
cli_v1::cmd_layout_skip_dead_nodes(
|
||||
&self.system_rpc_endpoint,
|
||||
self.rpc_host,
|
||||
assume_sync_opt,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cmd_assign_role(&self, opt: AssignRoleOpt) -> Result<(), Error> {
|
||||
let status = self.api_request(GetClusterStatusRequest).await?;
|
||||
let layout = self.api_request(GetClusterLayoutRequest).await?;
|
||||
|
||||
let all_node_ids_iter = status
|
||||
.nodes
|
||||
.iter()
|
||||
.map(|x| x.id.as_str())
|
||||
.chain(layout.roles.iter().map(|x| x.id.as_str()));
|
||||
|
||||
let mut actions = vec![];
|
||||
|
||||
for node in opt.replace.iter() {
|
||||
let id = find_matching_node(all_node_ids_iter.clone(), &node)?;
|
||||
|
||||
actions.push(NodeRoleChange {
|
||||
id,
|
||||
action: NodeRoleChangeEnum::Remove { remove: true },
|
||||
});
|
||||
}
|
||||
|
||||
for node in opt.node_ids.iter() {
|
||||
let id = find_matching_node(all_node_ids_iter.clone(), &node)?;
|
||||
|
||||
let current = get_staged_or_current_role(&id, &layout);
|
||||
|
||||
let zone = opt
|
||||
.zone
|
||||
.clone()
|
||||
.or_else(|| current.as_ref().map(|c| c.zone.clone()))
|
||||
.ok_or_message("Please specify a zone with the -z flag")?;
|
||||
|
||||
let capacity = if opt.gateway {
|
||||
if opt.capacity.is_some() {
|
||||
return Err(Error::Message("Please specify only -c or -g".into()));
|
||||
}
|
||||
None
|
||||
} else if let Some(cap) = opt.capacity {
|
||||
Some(cap.as_u64())
|
||||
} else {
|
||||
current.as_ref().ok_or_message("Please specify a capacity with the -c flag, or set node explicitly as gateway with -g")?.capacity
|
||||
};
|
||||
|
||||
let tags = if !opt.tags.is_empty() {
|
||||
opt.tags.clone()
|
||||
} else if let Some(cur) = current.as_ref() {
|
||||
cur.tags.clone()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
||||
actions.push(NodeRoleChange {
|
||||
id,
|
||||
action: NodeRoleChangeEnum::Update {
|
||||
zone,
|
||||
capacity,
|
||||
tags,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
self.api_request(UpdateClusterLayoutRequest(actions))
|
||||
.await?;
|
||||
|
||||
println!("Role changes are staged but not yet committed.");
|
||||
println!("Use `garage layout show` to view staged role changes,");
|
||||
println!("and `garage layout apply` to enact staged changes.");
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -1,12 +1,15 @@
|
|||
pub mod util;
|
||||
|
||||
pub mod cluster;
|
||||
pub mod layout;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::convert::TryFrom;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use format_table::format_table;
|
||||
use garage_util::error::*;
|
||||
|
||||
use garage_rpc::layout::*;
|
||||
use garage_rpc::system::*;
|
||||
use garage_rpc::*;
|
||||
|
||||
|
@ -14,7 +17,9 @@ use garage_api::admin::api::*;
|
|||
use garage_api::admin::EndpointHandler as AdminApiEndpoint;
|
||||
|
||||
use crate::admin::*;
|
||||
use crate::cli::*;
|
||||
use crate::cli as cli_v1;
|
||||
use crate::cli::structs::*;
|
||||
use crate::cli::Command;
|
||||
|
||||
pub struct Cli {
|
||||
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
|
||||
|
@ -24,13 +29,64 @@ pub struct Cli {
|
|||
|
||||
impl Cli {
|
||||
pub async fn handle(&self, cmd: Command) -> Result<(), Error> {
|
||||
println!("{:?}", self.api_request(GetClusterStatusRequest).await?);
|
||||
Ok(())
|
||||
/*
|
||||
match cmd {
|
||||
_ => todo!(),
|
||||
Command::Status => self.cmd_status().await,
|
||||
Command::Node(NodeOperation::Connect(connect_opt)) => {
|
||||
self.cmd_connect(connect_opt).await
|
||||
}
|
||||
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
|
||||
|
||||
// TODO
|
||||
Command::Bucket(bo) => cli_v1::cmd_admin(
|
||||
&self.admin_rpc_endpoint,
|
||||
self.rpc_host,
|
||||
AdminRpc::BucketOperation(bo),
|
||||
)
|
||||
.await
|
||||
.ok_or_message("xoxo"),
|
||||
Command::Key(ko) => cli_v1::cmd_admin(
|
||||
&self.admin_rpc_endpoint,
|
||||
self.rpc_host,
|
||||
AdminRpc::KeyOperation(ko),
|
||||
)
|
||||
.await
|
||||
.ok_or_message("xoxo"),
|
||||
Command::Repair(ro) => cli_v1::cmd_admin(
|
||||
&self.admin_rpc_endpoint,
|
||||
self.rpc_host,
|
||||
AdminRpc::LaunchRepair(ro),
|
||||
)
|
||||
.await
|
||||
.ok_or_message("xoxo"),
|
||||
Command::Stats(so) => {
|
||||
cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so))
|
||||
.await
|
||||
.ok_or_message("xoxo")
|
||||
}
|
||||
Command::Worker(wo) => cli_v1::cmd_admin(
|
||||
&self.admin_rpc_endpoint,
|
||||
self.rpc_host,
|
||||
AdminRpc::Worker(wo),
|
||||
)
|
||||
.await
|
||||
.ok_or_message("xoxo"),
|
||||
Command::Block(bo) => cli_v1::cmd_admin(
|
||||
&self.admin_rpc_endpoint,
|
||||
self.rpc_host,
|
||||
AdminRpc::BlockOperation(bo),
|
||||
)
|
||||
.await
|
||||
.ok_or_message("xoxo"),
|
||||
Command::Meta(mo) => cli_v1::cmd_admin(
|
||||
&self.admin_rpc_endpoint,
|
||||
self.rpc_host,
|
||||
AdminRpc::MetaOperation(mo),
|
||||
)
|
||||
.await
|
||||
.ok_or_message("xoxo"),
|
||||
|
||||
_ => unreachable!(),
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
pub async fn api_request<T>(&self, req: T) -> Result<<T as AdminApiEndpoint>::Response, Error>
|
||||
|
|
115
src/garage/cli_v2/util.rs
Normal file
115
src/garage/cli_v2/util.rs
Normal file
|
@ -0,0 +1,115 @@
|
|||
use bytesize::ByteSize;
|
||||
use format_table::format_table;
|
||||
|
||||
use garage_util::error::Error;
|
||||
|
||||
use garage_api::admin::api::*;
|
||||
|
||||
pub fn capacity_string(v: Option<u64>) -> String {
|
||||
match v {
|
||||
Some(c) => ByteSize::b(c).to_string_as(false),
|
||||
None => "gateway".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_staged_or_current_role(
|
||||
id: &str,
|
||||
layout: &GetClusterLayoutResponse,
|
||||
) -> Option<NodeRoleResp> {
|
||||
for node in layout.staged_role_changes.iter() {
|
||||
if node.id == id {
|
||||
return match &node.action {
|
||||
NodeRoleChangeEnum::Remove { .. } => None,
|
||||
NodeRoleChangeEnum::Update {
|
||||
zone,
|
||||
capacity,
|
||||
tags,
|
||||
} => Some(NodeRoleResp {
|
||||
id: id.to_string(),
|
||||
zone: zone.to_string(),
|
||||
capacity: *capacity,
|
||||
tags: tags.clone(),
|
||||
}),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
for node in layout.roles.iter() {
|
||||
if node.id == id {
|
||||
return Some(node.clone());
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub fn find_matching_node<'a>(
|
||||
cand: impl std::iter::Iterator<Item = &'a str>,
|
||||
pattern: &'a str,
|
||||
) -> Result<String, Error> {
|
||||
let mut candidates = vec![];
|
||||
for c in cand {
|
||||
if c.starts_with(pattern) && !candidates.contains(&c) {
|
||||
candidates.push(c);
|
||||
}
|
||||
}
|
||||
if candidates.len() != 1 {
|
||||
Err(Error::Message(format!(
|
||||
"{} nodes match '{}'",
|
||||
candidates.len(),
|
||||
pattern,
|
||||
)))
|
||||
} else {
|
||||
Ok(candidates[0].to_string())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn print_staging_role_changes(layout: &GetClusterLayoutResponse) -> bool {
|
||||
let has_role_changes = !layout.staged_role_changes.is_empty();
|
||||
|
||||
// TODO!! Layout parameters
|
||||
let has_layout_changes = false;
|
||||
|
||||
if has_role_changes || has_layout_changes {
|
||||
println!();
|
||||
println!("==== STAGED ROLE CHANGES ====");
|
||||
if has_role_changes {
|
||||
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
|
||||
for change in layout.staged_role_changes.iter() {
|
||||
match &change.action {
|
||||
NodeRoleChangeEnum::Update {
|
||||
tags,
|
||||
zone,
|
||||
capacity,
|
||||
} => {
|
||||
let tags = tags.join(",");
|
||||
table.push(format!(
|
||||
"{:.16}\t{}\t{}\t{}",
|
||||
change.id,
|
||||
tags,
|
||||
zone,
|
||||
capacity_string(*capacity),
|
||||
));
|
||||
}
|
||||
NodeRoleChangeEnum::Remove { .. } => {
|
||||
table.push(format!("{:.16}\tREMOVED", change.id));
|
||||
}
|
||||
}
|
||||
}
|
||||
format_table(table);
|
||||
println!();
|
||||
}
|
||||
//TODO
|
||||
/*
|
||||
if has_layout_changes {
|
||||
println!(
|
||||
"Zone redundancy: {}",
|
||||
staging.parameters.get().zone_redundancy
|
||||
);
|
||||
}
|
||||
*/
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
|
@ -35,8 +35,6 @@ use garage_util::error::*;
|
|||
use garage_rpc::system::*;
|
||||
use garage_rpc::*;
|
||||
|
||||
use garage_model::helper::error::Error as HelperError;
|
||||
|
||||
use admin::*;
|
||||
use cli::*;
|
||||
use secrets::Secrets;
|
||||
|
|
Loading…
Add table
Reference in a new issue