Move things around, improvements to CLI

This commit is contained in:
Alex 2021-10-21 17:29:27 +02:00
parent b7eccf5264
commit bff5333c62
No known key found for this signature in database
GPG Key ID: EDABF9711E244EB1
15 changed files with 854 additions and 746 deletions

View File

@ -65,7 +65,7 @@ in let
*/
''^(src|tests)'' # fixed default
''.*\.(rs|toml)$'' # fixed default
''^(crdt|replication)'' # our crate submodules
''^(crdt|replication|cli)'' # our crate submodules
];
};

View File

@ -83,7 +83,7 @@ impl Error {
Error::NotFound => StatusCode::NOT_FOUND,
Error::Forbidden(_) => StatusCode::FORBIDDEN,
Error::InternalError(
GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_),
GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(_, _, _),
) => StatusCode::SERVICE_UNAVAILABLE,
Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => {
StatusCode::INTERNAL_SERVER_ERROR
@ -98,7 +98,7 @@ impl Error {
Error::Forbidden(_) => "AccessDenied",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::InternalError(
GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_),
GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(_, _, _),
) => "ServiceUnavailable",
Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError",
_ => "InvalidRequest",

View File

@ -1,661 +0,0 @@
use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::data::Uuid;
use garage_util::error::Error;
use garage_rpc::ring::*;
use garage_rpc::system::*;
use garage_rpc::*;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
use crate::admin_rpc::*;
#[derive(StructOpt, Debug)]
pub enum Command {
/// Run Garage server
#[structopt(name = "server")]
Server,
/// Print identifier (public key) of this garage node.
/// Generates a new keypair if necessary.
#[structopt(name = "node-id")]
NodeId(NodeIdOpt),
/// Get network status
#[structopt(name = "status")]
Status,
/// Garage node operations
#[structopt(name = "node")]
Node(NodeOperation),
/// Bucket operations
#[structopt(name = "bucket")]
Bucket(BucketOperation),
/// Key operations
#[structopt(name = "key")]
Key(KeyOperation),
/// Start repair of node data
#[structopt(name = "repair")]
Repair(RepairOpt),
/// Gather node statistics
#[structopt(name = "stats")]
Stats(StatsOpt),
}
#[derive(StructOpt, Debug)]
pub enum NodeOperation {
/// Connect to Garage node that is currently isolated from the system
#[structopt(name = "connect")]
Connect(ConnectNodeOpt),
/// Configure Garage node
#[structopt(name = "configure")]
Configure(ConfigureNodeOpt),
/// Remove Garage node from cluster
#[structopt(name = "remove")]
Remove(RemoveNodeOpt),
}
#[derive(StructOpt, Debug)]
pub struct NodeIdOpt {
/// Do not print usage instructions to stderr
#[structopt(short = "q", long = "quiet")]
pub(crate) quiet: bool,
}
#[derive(StructOpt, Debug)]
pub struct ConnectNodeOpt {
/// Node public key and address, in the format:
/// `<public key hexadecimal>@<ip or hostname>:<port>`
node: String,
}
#[derive(StructOpt, Debug)]
pub struct ConfigureNodeOpt {
/// Node to configure (prefix of hexadecimal node id)
node_id: String,
/// Location (zone or datacenter) of the node
#[structopt(short = "z", long = "zone")]
zone: Option<String>,
/// Capacity (in relative terms, use 1 to represent your smallest server)
#[structopt(short = "c", long = "capacity")]
capacity: Option<u32>,
/// Gateway-only node
#[structopt(short = "g", long = "gateway")]
gateway: bool,
/// Optional node tag
#[structopt(short = "t", long = "tag")]
tag: Option<String>,
/// Replaced node(s): list of node IDs that will be removed from the current cluster
#[structopt(long = "replace")]
replace: Vec<String>,
}
#[derive(StructOpt, Debug)]
pub struct RemoveNodeOpt {
/// Node to configure (prefix of hexadecimal node id)
node_id: String,
/// If this flag is not given, the node won't be removed
#[structopt(long = "yes")]
yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum BucketOperation {
/// List buckets
#[structopt(name = "list")]
List,
/// Get bucket info
#[structopt(name = "info")]
Info(BucketOpt),
/// Create bucket
#[structopt(name = "create")]
Create(BucketOpt),
/// Delete bucket
#[structopt(name = "delete")]
Delete(DeleteBucketOpt),
/// Allow key to read or write to bucket
#[structopt(name = "allow")]
Allow(PermBucketOpt),
/// Deny key from reading or writing to bucket
#[structopt(name = "deny")]
Deny(PermBucketOpt),
/// Expose as website or not
#[structopt(name = "website")]
Website(WebsiteOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct WebsiteOpt {
/// Create
#[structopt(long = "allow")]
pub allow: bool,
/// Delete
#[structopt(long = "deny")]
pub deny: bool,
/// Bucket name
pub bucket: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct BucketOpt {
/// Bucket name
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct DeleteBucketOpt {
/// Bucket name
pub name: String,
/// If this flag is not given, the bucket won't be deleted
#[structopt(long = "yes")]
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct PermBucketOpt {
/// Access key name or ID
#[structopt(long = "key")]
pub key_pattern: String,
/// Allow/deny read operations
#[structopt(long = "read")]
pub read: bool,
/// Allow/deny write operations
#[structopt(long = "write")]
pub write: bool,
/// Bucket name
pub bucket: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
#[structopt(name = "list")]
List,
/// Get key info
#[structopt(name = "info")]
Info(KeyOpt),
/// Create new key
#[structopt(name = "new")]
New(KeyNewOpt),
/// Rename key
#[structopt(name = "rename")]
Rename(KeyRenameOpt),
/// Delete key
#[structopt(name = "delete")]
Delete(KeyDeleteOpt),
/// Import key
#[structopt(name = "import")]
Import(KeyImportOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyOpt {
/// ID or name of the key
pub key_pattern: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyNewOpt {
/// Name of the key
#[structopt(long = "name", default_value = "Unnamed key")]
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyRenameOpt {
/// ID or name of the key
pub key_pattern: String,
/// New name of the key
pub new_name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyDeleteOpt {
/// ID or name of the key
pub key_pattern: String,
/// Confirm deletion
#[structopt(long = "yes")]
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyImportOpt {
/// Access key ID
pub key_id: String,
/// Secret access key
pub secret_key: String,
/// Key name
#[structopt(short = "n", default_value = "Imported key")]
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct RepairOpt {
/// Launch repair operation on all nodes
#[structopt(short = "a", long = "all-nodes")]
pub all_nodes: bool,
/// Confirm the launch of the repair operation
#[structopt(long = "yes")]
pub yes: bool,
#[structopt(subcommand)]
pub what: Option<RepairWhat>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
/// Only do a full sync of metadata tables
#[structopt(name = "tables")]
Tables,
/// Only repair (resync/rebalance) the set of stored blocks
#[structopt(name = "blocks")]
Blocks,
/// Only redo the propagation of object deletions to the version table (slow)
#[structopt(name = "versions")]
Versions,
/// Only redo the propagation of version deletions to the block ref table (extremely slow)
#[structopt(name = "block_refs")]
BlockRefs,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
pub all_nodes: bool,
/// Gather detailed statistics (this can be long)
#[structopt(short = "d", long = "detailed")]
pub detailed: bool,
}
pub async fn cli_cmd(
cmd: Command,
system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
admin_rpc_endpoint: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
) -> Result<(), Error> {
match cmd {
Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await,
Command::Node(NodeOperation::Connect(connect_opt)) => {
cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await
}
Command::Node(NodeOperation::Configure(configure_opt)) => {
cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await
}
Command::Node(NodeOperation::Remove(remove_opt)) => {
cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await
}
Command::Bucket(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
}
Command::Key(ko) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await
}
Command::Repair(ro) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
_ => unreachable!(),
}
}
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
println!("==== HEALTHY NODES ====");
let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) {
if let Some(cfg) = config.members.get(&adv.id) {
healthy_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}",
id = adv.id,
host = adv.status.hostname,
addr = adv.addr,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
));
} else {
healthy_nodes.push(format!(
"{id:?}\t{h}\t{addr}\tNO ROLE ASSIGNED",
id = adv.id,
h = adv.status.hostname,
addr = adv.addr,
));
}
}
format_table(healthy_nodes);
let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
let failure_case_1 = status.iter().any(|adv| !adv.is_up);
let failure_case_2 = config
.members
.iter()
.any(|(id, _)| !status_keys.contains(id));
if failure_case_1 || failure_case_2 {
println!("\n==== FAILED NODES ====");
let mut failed_nodes =
vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) {
if let Some(cfg) = config.members.get(&adv.id) {
failed_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}s ago",
id = adv.id,
host = adv.status.hostname,
addr = adv.addr,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
last_seen = "FIXME", // FIXME was (now_msec() - adv.last_seen) / 1000,
));
}
}
for (id, cfg) in config.members.iter() {
if !status.iter().any(|adv| adv.id == *id) {
failed_nodes.push(format!(
"{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen",
id = id,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
));
}
}
format_table(failed_nodes);
}
Ok(())
}
pub async fn cmd_connect(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
args: ConnectNodeOpt,
) -> Result<(), Error> {
match rpc_cli
.call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL)
.await??
{
SystemRpc::Ok => {
println!("Success.");
Ok(())
}
r => Err(Error::BadRpc(format!("Unexpected response: {:?}", r))),
}
}
pub async fn cmd_configure(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
args: ConfigureNodeOpt,
) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?;
let mut config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
for replaced in args.replace.iter() {
let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
if config.members.remove(&replaced_node).is_none() {
return Err(Error::Message(format!(
"Cannot replace node {:?} as it is not in current configuration",
replaced_node
)));
}
}
if args.capacity.is_some() && args.gateway {
return Err(Error::Message(
"-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
}
if args.capacity == Some(0) {
return Err(Error::Message("Invalid capacity value: 0".into()));
}
let new_entry = match config.members.get(&added_node) {
None => {
let capacity = match args.capacity {
Some(c) => Some(c),
None if args.gateway => None,
_ => return Err(Error::Message(
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
};
NetworkConfigEntry {
zone: args.zone.expect("Please specifiy a zone with the -z flag"),
capacity,
tag: args.tag.unwrap_or_default(),
}
}
Some(old) => {
let capacity = match args.capacity {
Some(c) => Some(c),
None if args.gateway => None,
_ => old.capacity,
};
NetworkConfigEntry {
zone: args.zone.unwrap_or_else(|| old.zone.to_string()),
capacity,
tag: args.tag.unwrap_or_else(|| old.tag.to_string()),
}
}
};
config.members.insert(added_node, new_entry);
config.version += 1;
rpc_cli
.call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
.await??;
Ok(())
}
pub async fn cmd_remove(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
args: RemoveNodeOpt,
) -> Result<(), Error> {
let mut config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?;
if !args.yes {
return Err(Error::Message(format!(
"Add the flag --yes to really remove {:?} from the cluster",
deleted_node
)));
}
config.members.remove(&deleted_node);
config.version += 1;
rpc_cli
.call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
.await??;
Ok(())
}
pub async fn cmd_admin(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), Error> {
match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
AdminRpc::BucketList(bl) => {
println!("List of buckets:");
for bucket in bl {
println!("{}", bucket);
}
}
AdminRpc::BucketInfo(bucket) => {
print_bucket_info(&bucket);
}
AdminRpc::KeyList(kl) => {
println!("List of keys:");
for key in kl {
println!("{}\t{}", key.0, key.1);
}
}
AdminRpc::KeyInfo(key) => {
print_key_info(&key);
}
r => {
error!("Unexpected response: {:?}", r);
}
}
Ok(())
}
// --- Utility functions ----
fn print_key_info(key: &Key) {
println!("Key name: {}", key.name.get());
println!("Key ID: {}", key.key_id);
println!("Secret key: {}", key.secret_key);
if key.deleted.get() {
println!("Key is deleted.");
} else {
println!("Authorized buckets:");
for (b, _, perm) in key.authorized_buckets.items().iter() {
println!("- {} R:{} W:{}", b, perm.allow_read, perm.allow_write);
}
}
}
fn print_bucket_info(bucket: &Bucket) {
println!("Bucket name: {}", bucket.name);
match bucket.state.get() {
BucketState::Deleted => println!("Bucket is deleted."),
BucketState::Present(p) => {
println!("Authorized keys:");
for (k, _, perm) in p.authorized_keys.items().iter() {
println!("- {} R:{} W:{}", k, perm.allow_read, perm.allow_write);
}
println!("Website access: {}", p.website.get());
}
};
}
fn format_table(data: Vec<String>) {
let data = data
.iter()
.map(|s| s.split('\t').collect::<Vec<_>>())
.collect::<Vec<_>>();
let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max);
let mut column_size = vec![0; columns];
let mut out = String::new();
for row in data.iter() {
for (i, col) in row.iter().enumerate() {
column_size[i] = std::cmp::max(column_size[i], col.chars().count());
}
}
for row in data.iter() {
for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) {
out.push_str(col);
(0..col_len - col.chars().count() + 2).for_each(|_| out.push(' '));
}
out.push_str(&row[row.len() - 1]);
out.push('\n');
}
print!("{}", out);
}
pub fn find_matching_node(
cand: impl std::iter::Iterator<Item = Uuid>,
pattern: &str,
) -> Result<Uuid, Error> {
let mut candidates = vec![];
for c in cand {
if hex::encode(&c).starts_with(&pattern) {
candidates.push(c);
}
}
if candidates.len() != 1 {
Err(Error::Message(format!(
"{} nodes match '{}'",
candidates.len(),
pattern,
)))
} else {
Ok(candidates[0])
}
}

284
src/garage/cli/cmd.rs Normal file
View File

@ -0,0 +1,284 @@
use std::collections::HashSet;
use garage_util::error::*;
use garage_rpc::ring::*;
use garage_rpc::system::*;
use garage_rpc::*;
use crate::admin::*;
use crate::cli::*;
pub async fn cli_command_dispatch(
cmd: Command,
system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
admin_rpc_endpoint: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
) -> Result<(), Error> {
match cmd {
Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await,
Command::Node(NodeOperation::Connect(connect_opt)) => {
cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await
}
Command::Node(NodeOperation::Configure(configure_opt)) => {
cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await
}
Command::Node(NodeOperation::Remove(remove_opt)) => {
cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await
}
Command::Bucket(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
}
Command::Key(ko) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await
}
Command::Repair(ro) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
_ => unreachable!(),
}
}
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
println!("==== HEALTHY NODES ====");
let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) {
if let Some(cfg) = config.members.get(&adv.id) {
healthy_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}",
id = adv.id,
host = adv.status.hostname,
addr = adv.addr,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
));
} else {
healthy_nodes.push(format!(
"{id:?}\t{h}\t{addr}\tNO ROLE ASSIGNED",
id = adv.id,
h = adv.status.hostname,
addr = adv.addr,
));
}
}
format_table(healthy_nodes);
let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
let failure_case_1 = status.iter().any(|adv| !adv.is_up);
let failure_case_2 = config
.members
.iter()
.any(|(id, _)| !status_keys.contains(id));
if failure_case_1 || failure_case_2 {
println!("\n==== FAILED NODES ====");
let mut failed_nodes =
vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) {
if let Some(cfg) = config.members.get(&adv.id) {
failed_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}s ago",
id = adv.id,
host = adv.status.hostname,
addr = adv.addr,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
last_seen = "FIXME", // FIXME was (now_msec() - adv.last_seen) / 1000,
));
}
}
for (id, cfg) in config.members.iter() {
if !status_keys.contains(id) {
failed_nodes.push(format!(
"{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen",
id = id,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
));
}
}
format_table(failed_nodes);
}
Ok(())
}
pub async fn cmd_connect(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
args: ConnectNodeOpt,
) -> Result<(), Error> {
match rpc_cli
.call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL)
.await??
{
SystemRpc::Ok => {
println!("Success.");
Ok(())
}
r => Err(Error::BadRpc(format!("Unexpected response: {:?}", r))),
}
}
pub async fn cmd_configure(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
args: ConfigureNodeOpt,
) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?;
let mut config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
for replaced in args.replace.iter() {
let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
if config.members.remove(&replaced_node).is_none() {
return Err(Error::Message(format!(
"Cannot replace node {:?} as it is not in current configuration",
replaced_node
)));
}
}
if args.capacity.is_some() && args.gateway {
return Err(Error::Message(
"-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
}
if args.capacity == Some(0) {
return Err(Error::Message("Invalid capacity value: 0".into()));
}
let new_entry = match config.members.get(&added_node) {
None => {
let capacity = match args.capacity {
Some(c) => Some(c),
None if args.gateway => None,
_ => return Err(Error::Message(
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
};
NetworkConfigEntry {
zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?,
capacity,
tag: args.tag.unwrap_or_default(),
}
}
Some(old) => {
let capacity = match args.capacity {
Some(c) => Some(c),
None if args.gateway => None,
_ => old.capacity,
};
NetworkConfigEntry {
zone: args.zone.unwrap_or_else(|| old.zone.to_string()),
capacity,
tag: args.tag.unwrap_or_else(|| old.tag.to_string()),
}
}
};
config.members.insert(added_node, new_entry);
config.version += 1;
rpc_cli
.call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
.await??;
Ok(())
}
pub async fn cmd_remove(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
args: RemoveNodeOpt,
) -> Result<(), Error> {
let mut config = match rpc_cli
.call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?;
if !args.yes {
return Err(Error::Message(format!(
"Add the flag --yes to really remove {:?} from the cluster",
deleted_node
)));
}
config.members.remove(&deleted_node);
config.version += 1;
rpc_cli
.call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
.await??;
Ok(())
}
pub async fn cmd_admin(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), Error> {
match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
AdminRpc::BucketList(bl) => {
println!("List of buckets:");
for bucket in bl {
println!("{}", bucket);
}
}
AdminRpc::BucketInfo(bucket) => {
print_bucket_info(&bucket);
}
AdminRpc::KeyList(kl) => {
println!("List of keys:");
for key in kl {
println!("{}\t{}", key.0, key.1);
}
}
AdminRpc::KeyInfo(key) => {
print_key_info(&key);
}
r => {
error!("Unexpected response: {:?}", r);
}
}
Ok(())
}
// --- Utility functions ----

65
src/garage/cli/init.rs Normal file
View File

@ -0,0 +1,65 @@
use std::path::PathBuf;
use garage_util::error::*;
pub const READ_KEY_ERROR: &str = "Unable to read node key. It will be generated by your garage node the first time is it launched. Ensure that your garage node is currently running. (The node key is supposed to be stored in your metadata directory.)";
pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> {
let config = garage_util::config::read_config(config_file.clone()).err_context(format!(
"Unable to read configuration file {}",
config_file.to_string_lossy(),
))?;
let node_id =
garage_rpc::system::read_node_id(&config.metadata_dir).err_context(READ_KEY_ERROR)?;
let idstr = if let Some(addr) = config.rpc_public_addr {
let idstr = format!("{}@{}", hex::encode(&node_id), addr);
println!("{}", idstr);
idstr
} else {
let idstr = hex::encode(&node_id);
println!("{}", idstr);
if !quiet {
eprintln!("WARNING: I don't know the public address to reach this node.");
eprintln!("In all of the instructions below, replace 127.0.0.1:3901 by the appropriate address and port.");
}
format!("{}@127.0.0.1:3901", idstr)
};
if !quiet {
eprintln!();
eprintln!(
"To instruct a node to connect to this node, run the following command on that node:"
);
eprintln!(" garage [-c <config file path>] node connect {}", idstr);
eprintln!();
eprintln!("Or instruct them to connect from here by running:");
eprintln!(
" garage -c {} -h <remote node> node connect {}",
config_file.to_string_lossy(),
idstr
);
eprintln!(
"where <remote_node> is their own node identifier in the format: <pubkey>@<ip>:<port>"
);
eprintln!();
eprintln!("This node identifier can also be added as a bootstrap node in other node's garage.toml files:");
eprintln!(" bootstrap_peers = [");
eprintln!(" \"{}\",", idstr);
eprintln!(" ...");
eprintln!(" ]");
eprintln!();
eprintln!(
r#"Security notice: Garage's intra-cluster communications are secured primarily by the shared
secret value rpc_secret. However, an attacker that knows rpc_secret (for example if it
leaks) cannot connect if they do not know any of the identifiers of the nodes in the
cluster. It is thus a good security measure to try to keep them secret if possible.
"#
);
}
Ok(())
}

9
src/garage/cli/mod.rs Normal file
View File

@ -0,0 +1,9 @@
pub(crate) mod cmd;
pub(crate) mod init;
pub(crate) mod structs;
pub(crate) mod util;
pub(crate) use cmd::*;
pub(crate) use init::*;
pub(crate) use structs::*;
pub(crate) use util::*;

296
src/garage/cli/structs.rs Normal file
View File

@ -0,0 +1,296 @@
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
#[derive(StructOpt, Debug)]
pub enum Command {
/// Run Garage server
#[structopt(name = "server")]
Server,
/// Print identifier (public key) of this garage node.
/// Generates a new keypair if necessary.
#[structopt(name = "node-id")]
NodeId(NodeIdOpt),
/// Get network status
#[structopt(name = "status")]
Status,
/// Garage node operations
#[structopt(name = "node")]
Node(NodeOperation),
/// Bucket operations
#[structopt(name = "bucket")]
Bucket(BucketOperation),
/// Key operations
#[structopt(name = "key")]
Key(KeyOperation),
/// Start repair of node data
#[structopt(name = "repair")]
Repair(RepairOpt),
/// Gather node statistics
#[structopt(name = "stats")]
Stats(StatsOpt),
}
#[derive(StructOpt, Debug)]
pub enum NodeOperation {
/// Connect to Garage node that is currently isolated from the system
#[structopt(name = "connect")]
Connect(ConnectNodeOpt),
/// Configure Garage node
#[structopt(name = "configure")]
Configure(ConfigureNodeOpt),
/// Remove Garage node from cluster
#[structopt(name = "remove")]
Remove(RemoveNodeOpt),
}
#[derive(StructOpt, Debug)]
pub struct NodeIdOpt {
/// Do not print usage instructions to stderr
#[structopt(short = "q", long = "quiet")]
pub(crate) quiet: bool,
}
#[derive(StructOpt, Debug)]
pub struct ConnectNodeOpt {
/// Node public key and address, in the format:
/// `<public key hexadecimal>@<ip or hostname>:<port>`
pub(crate) node: String,
}
#[derive(StructOpt, Debug)]
pub struct ConfigureNodeOpt {
/// Node to configure (prefix of hexadecimal node id)
pub(crate) node_id: String,
/// Location (zone or datacenter) of the node
#[structopt(short = "z", long = "zone")]
pub(crate) zone: Option<String>,
/// Capacity (in relative terms, use 1 to represent your smallest server)
#[structopt(short = "c", long = "capacity")]
pub(crate) capacity: Option<u32>,
/// Gateway-only node
#[structopt(short = "g", long = "gateway")]
pub(crate) gateway: bool,
/// Optional node tag
#[structopt(short = "t", long = "tag")]
pub(crate) tag: Option<String>,
/// Replaced node(s): list of node IDs that will be removed from the current cluster
#[structopt(long = "replace")]
pub(crate) replace: Vec<String>,
}
#[derive(StructOpt, Debug)]
pub struct RemoveNodeOpt {
/// Node to configure (prefix of hexadecimal node id)
pub(crate) node_id: String,
/// If this flag is not given, the node won't be removed
#[structopt(long = "yes")]
pub(crate) yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum BucketOperation {
/// List buckets
#[structopt(name = "list")]
List,
/// Get bucket info
#[structopt(name = "info")]
Info(BucketOpt),
/// Create bucket
#[structopt(name = "create")]
Create(BucketOpt),
/// Delete bucket
#[structopt(name = "delete")]
Delete(DeleteBucketOpt),
/// Allow key to read or write to bucket
#[structopt(name = "allow")]
Allow(PermBucketOpt),
/// Deny key from reading or writing to bucket
#[structopt(name = "deny")]
Deny(PermBucketOpt),
/// Expose as website or not
#[structopt(name = "website")]
Website(WebsiteOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct WebsiteOpt {
/// Create
#[structopt(long = "allow")]
pub allow: bool,
/// Delete
#[structopt(long = "deny")]
pub deny: bool,
/// Bucket name
pub bucket: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct BucketOpt {
/// Bucket name
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct DeleteBucketOpt {
/// Bucket name
pub name: String,
/// If this flag is not given, the bucket won't be deleted
#[structopt(long = "yes")]
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct PermBucketOpt {
/// Access key name or ID
#[structopt(long = "key")]
pub key_pattern: String,
/// Allow/deny read operations
#[structopt(long = "read")]
pub read: bool,
/// Allow/deny write operations
#[structopt(long = "write")]
pub write: bool,
/// Bucket name
pub bucket: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
#[structopt(name = "list")]
List,
/// Get key info
#[structopt(name = "info")]
Info(KeyOpt),
/// Create new key
#[structopt(name = "new")]
New(KeyNewOpt),
/// Rename key
#[structopt(name = "rename")]
Rename(KeyRenameOpt),
/// Delete key
#[structopt(name = "delete")]
Delete(KeyDeleteOpt),
/// Import key
#[structopt(name = "import")]
Import(KeyImportOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyOpt {
/// ID or name of the key
pub key_pattern: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyNewOpt {
/// Name of the key
#[structopt(long = "name", default_value = "Unnamed key")]
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyRenameOpt {
/// ID or name of the key
pub key_pattern: String,
/// New name of the key
pub new_name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyDeleteOpt {
/// ID or name of the key
pub key_pattern: String,
/// Confirm deletion
#[structopt(long = "yes")]
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyImportOpt {
/// Access key ID
pub key_id: String,
/// Secret access key
pub secret_key: String,
/// Key name
#[structopt(short = "n", default_value = "Imported key")]
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct RepairOpt {
/// Launch repair operation on all nodes
#[structopt(short = "a", long = "all-nodes")]
pub all_nodes: bool,
/// Confirm the launch of the repair operation
#[structopt(long = "yes")]
pub yes: bool,
#[structopt(subcommand)]
pub what: Option<RepairWhat>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
/// Only do a full sync of metadata tables
#[structopt(name = "tables")]
Tables,
/// Only repair (resync/rebalance) the set of stored blocks
#[structopt(name = "blocks")]
Blocks,
/// Only redo the propagation of object deletions to the version table (slow)
#[structopt(name = "versions")]
Versions,
/// Only redo the propagation of version deletions to the block ref table (extremely slow)
#[structopt(name = "block_refs")]
BlockRefs,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
pub all_nodes: bool,
/// Gather detailed statistics (this can be long)
#[structopt(short = "d", long = "detailed")]
pub detailed: bool,
}

83
src/garage/cli/util.rs Normal file
View File

@ -0,0 +1,83 @@
use garage_util::data::Uuid;
use garage_util::error::*;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
pub fn print_key_info(key: &Key) {
println!("Key name: {}", key.name.get());
println!("Key ID: {}", key.key_id);
println!("Secret key: {}", key.secret_key);
if key.deleted.get() {
println!("Key is deleted.");
} else {
println!("Authorized buckets:");
for (b, _, perm) in key.authorized_buckets.items().iter() {
println!("- {} R:{} W:{}", b, perm.allow_read, perm.allow_write);
}
}
}
pub fn print_bucket_info(bucket: &Bucket) {
println!("Bucket name: {}", bucket.name);
match bucket.state.get() {
BucketState::Deleted => println!("Bucket is deleted."),
BucketState::Present(p) => {
println!("Authorized keys:");
for (k, _, perm) in p.authorized_keys.items().iter() {
println!("- {} R:{} W:{}", k, perm.allow_read, perm.allow_write);
}
println!("Website access: {}", p.website.get());
}
};
}
pub fn format_table(data: Vec<String>) {
let data = data
.iter()
.map(|s| s.split('\t').collect::<Vec<_>>())
.collect::<Vec<_>>();
let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max);
let mut column_size = vec![0; columns];
let mut out = String::new();
for row in data.iter() {
for (i, col) in row.iter().enumerate() {
column_size[i] = std::cmp::max(column_size[i], col.chars().count());
}
}
for row in data.iter() {
for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) {
out.push_str(col);
(0..col_len - col.chars().count() + 2).for_each(|_| out.push(' '));
}
out.push_str(&row[row.len() - 1]);
out.push('\n');
}
print!("{}", out);
}
pub fn find_matching_node(
cand: impl std::iter::Iterator<Item = Uuid>,
pattern: &str,
) -> Result<Uuid, Error> {
let mut candidates = vec![];
for c in cand {
if hex::encode(&c).starts_with(&pattern) {
candidates.push(c);
}
}
if candidates.len() != 1 {
Err(Error::Message(format!(
"{} nodes match '{}'",
candidates.len(),
pattern,
)))
} else {
Ok(candidates[0])
}
}

View File

@ -4,7 +4,7 @@
#[macro_use]
extern crate log;
mod admin_rpc;
mod admin;
mod cli;
mod repair;
mod server;
@ -16,12 +16,12 @@ use structopt::StructOpt;
use netapp::util::parse_and_resolve_peer_addr;
use netapp::NetworkKey;
use garage_util::error::Error;
use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
use admin_rpc::*;
use admin::*;
use cli::*;
#[derive(StructOpt, Debug)]
@ -66,7 +66,7 @@ async fn main() {
};
if let Err(e) = res {
error!("{}", e);
eprintln!("Error: {}", e);
std::process::exit(1);
}
}
@ -74,7 +74,7 @@ async fn main() {
async fn cli_command(opt: Opt) -> Result<(), Error> {
let config = if opt.rpc_secret.is_none() || opt.rpc_host.is_none() {
Some(garage_util::config::read_config(opt.config_file.clone())
.map_err(|e| Error::Message(format!("Unable to read configuration file {}: {}. Configuration file is needed because -h or -s is not provided on the command line.", opt.config_file.to_string_lossy(), e)))?)
.err_context(format!("Unable to read configuration file {}. Configuration file is needed because -h or -s is not provided on the command line.", opt.config_file.to_string_lossy()))?)
} else {
None
};
@ -84,11 +84,11 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
.rpc_secret
.as_ref()
.or_else(|| config.as_ref().map(|c| &c.rpc_secret))
.expect("No RPC secret provided");
.ok_or("No RPC secret provided")?;
let network_key = NetworkKey::from_slice(
&hex::decode(net_key_hex_str).expect("Invalid RPC secret key (bad hex)")[..],
&hex::decode(net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..],
)
.expect("Invalid RPC secret provided (wrong length)");
.ok_or("Invalid RPC secret provided (wrong length)")?;
// Generate a temporary keypair for our RPC client
let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
@ -97,77 +97,22 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Find and parse the address of the target host
let (id, addr) = if let Some(h) = opt.rpc_host {
let (id, addrs) = parse_and_resolve_peer_addr(&h).expect("Invalid RPC host");
let (id, addrs) = parse_and_resolve_peer_addr(&h).ok_or_else(|| format!("Invalid RPC remote node identifier: {}. Expected format is <pubkey>@<IP or hostname>:<port>.", h))?;
(id, addrs[0])
} else if let Some(a) = config.as_ref().map(|c| c.rpc_public_addr).flatten() {
let node_key = garage_rpc::system::gen_node_key(&config.unwrap().metadata_dir)
.map_err(|e| Error::Message(format!("Unable to read or generate node key: {}", e)))?;
(node_key.public_key(), a)
let node_id = garage_rpc::system::read_node_id(&config.unwrap().metadata_dir)
.err_context(READ_KEY_ERROR)?;
(node_id, a)
} else {
return Err(Error::Message("No RPC host provided".into()));
};
// Connect to target host
netapp.clone().try_connect(addr, id).await?;
netapp.clone().try_connect(addr, id).await
.err_context("Unable to connect to destination RPC host. Check that you are using the same value of rpc_secret as them, and that you have their correct public key.")?;
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
cli_cmd(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await
}
fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> {
let config = garage_util::config::read_config(config_file.clone()).map_err(|e| {
Error::Message(format!(
"Unable to read configuration file {}: {}",
config_file.to_string_lossy(),
e
))
})?;
let node_key = garage_rpc::system::gen_node_key(&config.metadata_dir)
.map_err(|e| Error::Message(format!("Unable to read or generate node key: {}", e)))?;
let idstr = if let Some(addr) = config.rpc_public_addr {
let idstr = format!("{}@{}", hex::encode(&node_key.public_key()), addr);
println!("{}", idstr);
idstr
} else {
let idstr = hex::encode(&node_key.public_key());
println!("{}", idstr);
if !quiet {
eprintln!("WARNING: I don't know the public address to reach this node.");
eprintln!("In all of the instructions below, replace 127.0.0.1:3901 by the appropriate address and port.");
}
format!("{}@127.0.0.1:3901", idstr)
};
if !quiet {
eprintln!();
eprintln!(
"To instruct a node to connect to this node, run the following command on that node:"
);
eprintln!(" garage [-c <config file path>] node connect {}", idstr);
eprintln!();
eprintln!("Or instruct them to connect from here by running:");
eprintln!(
" garage -c {} -h <remote node> node connect {}",
config_file.to_string_lossy(),
idstr
);
eprintln!(
"where <remote_node> is their own node identifier in the format: <pubkey>@<ip>:<port>"
);
eprintln!();
eprintln!("This node identifier can also be added as a bootstrap node in other node's garage.toml files:");
eprintln!(" bootstrap_peers = [");
eprintln!(" \"{}\",", idstr);
eprintln!(" ...");
eprintln!(" ]");
eprintln!();
}
Ok(())
cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await
}

View File

@ -10,7 +10,7 @@ use garage_api::run_api_server;
use garage_model::garage::Garage;
use garage_web::run_web_server;
use crate::admin_rpc::*;
use crate::admin::*;
async fn wait_from(mut chan: watch::Receiver<bool>) {
while !*chan.borrow() {

View File

@ -203,7 +203,7 @@ impl RpcHelper {
Ok(results)
} else {
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
Err(Error::TooManyErrors(errors))
Err(Error::Quorum(results.len(), to.len(), errors))
}
}
}

View File

@ -24,7 +24,7 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
use garage_util::data::Uuid;
use garage_util::error::Error;
use garage_util::error::*;
use garage_util::persister::Persister;
use garage_util::time::*;
@ -39,6 +39,8 @@ const PING_TIMEOUT: Duration = Duration::from_secs(2);
/// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
pub const CONNECT_ERROR_MESSAGE: &str = "Error establishing RPC connection to remote node. This can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret";
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum SystemRpc {
@ -113,6 +115,22 @@ pub struct KnownNodeInfo {
pub status: NodeStatus,
}
pub fn read_node_id(metadata_dir: &Path) -> Result<NodeID, Error> {
let mut pubkey_file = metadata_dir.to_path_buf();
pubkey_file.push("node_key.pub");
let mut f = std::fs::File::open(pubkey_file.as_path())?;
let mut d = vec![];
f.read_to_end(&mut d)?;
if d.len() != 32 {
return Err(Error::Message("Corrupt node_key.pub file".to_string()));
}
let mut key = [0u8; 32];
key.copy_from_slice(&d[..]);
Ok(NodeID::from_slice(&key[..]).unwrap())
}
pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
let mut key_file = metadata_dir.to_path_buf();
key_file.push("node_key");
@ -128,10 +146,30 @@ pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
key.copy_from_slice(&d[..]);
Ok(NodeKey::from_slice(&key[..]).unwrap())
} else {
let (_, key) = ed25519::gen_keypair();
if !metadata_dir.exists() {
info!("Metadata directory does not exist, creating it.");
std::fs::create_dir(&metadata_dir)?;
}
info!("Generating new node key pair.");
let (pubkey, key) = ed25519::gen_keypair();
{
use std::os::unix::fs::PermissionsExt;
let mut f = std::fs::File::create(key_file.as_path())?;
let mut perm = f.metadata()?.permissions();
perm.set_mode(0o600);
std::fs::set_permissions(key_file.as_path(), perm)?;
f.write_all(&key[..])?;
}
{
let mut pubkey_file = metadata_dir.to_path_buf();
pubkey_file.push("node_key.pub");
let mut f2 = std::fs::File::create(pubkey_file.as_path())?;
f2.write_all(&pubkey[..])?;
}
let mut f = std::fs::File::create(key_file.as_path())?;
f.write_all(&key[..])?;
Ok(key)
}
}
@ -252,7 +290,7 @@ impl System {
rpc_public_addr,
)
.await
.map_err(|e| Error::Message(format!("Error while publishing Consul service: {}", e)))
.err_context("Error while publishing Consul service")
}
/// Save network configuration to disc
@ -282,7 +320,13 @@ impl System {
})?;
let mut errors = vec![];
for ip in addrs.iter() {
match self.netapp.clone().try_connect(*ip, pubkey).await {
match self
.netapp
.clone()
.try_connect(*ip, pubkey)
.await
.err_context(CONNECT_ERROR_MESSAGE)
{
Ok(()) => return Ok(SystemRpc::Ok),
Err(e) => {
errors.push((*ip, e));
@ -445,7 +489,12 @@ impl System {
}
for (node_id, node_addr) in ping_list {
tokio::spawn(self.netapp.clone().try_connect(node_addr, node_id));
tokio::spawn(
self.netapp
.clone()
.try_connect(node_addr, node_id)
.map(|r| r.err_context(CONNECT_ERROR_MESSAGE)),
);
}
}

View File

@ -47,8 +47,13 @@ pub enum Error {
#[error(display = "Timeout")]
Timeout,
#[error(display = "Too many errors: {:?}", _0)]
TooManyErrors(Vec<String>),
#[error(
display = "Could not reach quorum. {} of {} request succeeded, others returned errors: {:?}",
_0,
_1,
_2
)]
Quorum(usize, usize, Vec<String>),
#[error(display = "Bad RPC: {}", _0)]
BadRpc(String),
@ -81,6 +86,39 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
}
}
impl<'a> From<&'a str> for Error {
fn from(v: &'a str) -> Error {
Error::Message(v.to_string())
}
}
impl From<String> for Error {
fn from(v: String) -> Error {
Error::Message(v)
}
}
pub trait ErrorContext<T, E> {
fn err_context<C: std::borrow::Borrow<str>>(self, ctx: C) -> Result<T, Error>;
}
impl<T, E> ErrorContext<T, E> for Result<T, E>
where
E: std::fmt::Display,
{
#[inline]
fn err_context<C: std::borrow::Borrow<str>>(self, ctx: C) -> Result<T, Error> {
match self {
Ok(x) => Ok(x),
Err(e) => Err(Error::Message(format!(
"{}\nOriginal error: {}",
ctx.borrow(),
e
))),
}
}
}
// Custom serialization for our error type, for use in RPC.
// Errors are serialized as a string of their Display representation.
// Upon deserialization, they all become a RemoteError with the

View File

@ -39,7 +39,7 @@ impl Error {
Error::NotFound => StatusCode::NOT_FOUND,
Error::ApiError(e) => e.http_status_code(),
Error::InternalError(
GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_),
GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(_, _, _),
) => StatusCode::SERVICE_UNAVAILABLE,
Error::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::BAD_REQUEST,