WIP: Automatically create node layout, keys and buckets #883

Draft
apapsch wants to merge 6 commits from apapsch/garage:feature/auto-config into main
4 changed files with 210 additions and 108 deletions
Showing only changes of commit 8b659015d7 - Show all commits

178
src/garage/cli/auto.rs Normal file
View file

@ -0,0 +1,178 @@
use crate::admin::AdminRpc;
use crate::cli::{cmd_apply_layout, cmd_assign_role, fetch_layout, fetch_status, ApplyLayoutOpt, AssignRoleOpt, BucketOperation, BucketOpt, KeyImportOpt, KeyInfoOpt, KeyOperation, PermBucketOpt};
use bytesize::ByteSize;
use garage_model::helper::error::Error as HelperError;
use garage_net::endpoint::Endpoint;
use garage_net::message::PRIO_NORMAL;
use garage_net::NodeID;
use garage_rpc::layout::NodeRoleV;
use garage_rpc::system::SystemRpc;
use garage_util::config::{AutoBucket, AutoKey, AutoNode, AutoPermission};
use garage_util::data::Uuid;
use garage_util::error::Error;
pub async fn key_exists(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
key_pattern: String,
) -> Result<bool, Error> {
match rpc_cli
.call(&rpc_host, AdminRpc::KeyOperation(
KeyOperation::Info(KeyInfoOpt{
key_pattern,
show_secret: false,
})), PRIO_NORMAL)
.await?
{
Ok(_) => Ok(true),
Err(HelperError::BadRequest(_)) => Ok(false),
resp => Err(Error::unexpected_rpc_message(resp)),
}
}
pub async fn bucket_exists(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
name: String,
) -> Result<bool, Error> {
match rpc_cli
.call(&rpc_host, AdminRpc::BucketOperation(
BucketOperation::Info(BucketOpt{name})
), PRIO_NORMAL)
.await?
{
Ok(_) => Ok(true),
Err(HelperError::BadRequest(_)) => Ok(false),
resp => Err(Error::unexpected_rpc_message(resp)),
}
}
pub async fn key_create(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
params: &AutoKey,
) -> Result<(), Error> {
match rpc_cli
.call(&rpc_host, AdminRpc::KeyOperation(
KeyOperation::Import(KeyImportOpt{
name: params.name.clone(),
secret_key: params.secret.clone(),
key_id: params.id.clone(),
yes: true,
})
), PRIO_NORMAL).await?
{
Ok(_) => Ok(()),
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
resp => Err(Error::unexpected_rpc_message(resp))
}
}
pub async fn bucket_create(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
params: &AutoBucket,
) -> Result<(), Error> {
match rpc_cli
.call(&rpc_host, AdminRpc::BucketOperation(
BucketOperation::Create(BucketOpt{name: params.name.clone()})
), PRIO_NORMAL)
.await?
{
Ok(_) => Ok(()),
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
resp => Err(Error::unexpected_rpc_message(resp))
}
}
pub async fn grant_permission(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
bucket_name: String,
perm: &AutoPermission,
) -> Result<(), Error> {
match rpc_cli
.call(&rpc_host, AdminRpc::BucketOperation(
BucketOperation::Allow(PermBucketOpt{
key_pattern: perm.key.clone(),
read: perm.read,
write: perm.write,
owner: perm.owner,
bucket: bucket_name,
})
), PRIO_NORMAL)
.await?
{
Ok(_) => Ok(()),
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
resp => Err(Error::unexpected_rpc_message(resp))
}
}
pub async fn get_unassigned_nodes(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<Option<Vec<Uuid>>, Error> {
let status = fetch_status(rpc_cli, rpc_host).await?;
let layout = fetch_layout(rpc_cli, rpc_host).await?;
let mut nodes: Vec<Uuid> = Vec::new();
for adv in status.iter().filter(|adv| adv.is_up) {
if layout.current().roles.get(&adv.id).is_none() {
let prev_role = layout
.versions
.iter()
.rev()
.find_map(|x| match x.roles.get(&adv.id) {
Some(NodeRoleV(Some(cfg))) => Some(cfg),
_ => None,
});
if prev_role.is_none() {
if let Some(NodeRoleV(Some(_))) = layout.staging.get().roles.get(&adv.id) {
// Node role assignment is pending, can return immediately.
return Ok(None);
} else {
nodes.push(adv.id.clone());
}
}
} else {
// Node role is assigned, can return immediately.
return Ok(None);
}
}
// Encountered no node with an assignment (pending or applied).
// Therefore, all nodes are unassigned.
Ok(Some(nodes))
}
pub async fn assign_node_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
unassigned_nodes: &Vec<Uuid>,
auto_nodes: &Vec<AutoNode>,
) -> Result<(), Error> {
if unassigned_nodes.len() != auto_nodes.len() {
return Err(Error::Message("Cannot apply auto layout: configured nodes do not match actual nodes".to_string()));
}
for (i, node_id) in unassigned_nodes.iter().enumerate() {
if let Some(auto) = auto_nodes.get(i) {
let capacity = auto.capacity.parse::<ByteSize>()?;
cmd_assign_role(rpc_cli, rpc_host, AssignRoleOpt{
node_ids: vec![format!("{id:?}", id=node_id)],
zone: Some(auto.zone.clone()),
capacity: Some(capacity),
gateway: false,
tags: vec![],
replace: vec![],
}).await?;
}
}
cmd_apply_layout(rpc_cli, rpc_host, ApplyLayoutOpt{
version: Some(1),
}).await?;
Ok(())
}

View file

@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
use std::time::Duration;
use format_table::format_table;
use garage_util::config::{AutoBucket, AutoConfig, AutoKey, AutoPermission, Config};
use garage_util::config::{AutoConfig, Config};
use garage_util::error::*;
use garage_rpc::layout::*;
@ -47,7 +47,7 @@ pub async fn cli_command_dispatch(
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await
}
Command::Auto => {
cmd_auto(admin_rpc_endpoint, rpc_host, config.auto.as_ref()).await
cmd_auto(admin_rpc_endpoint, system_rpc_endpoint, rpc_host, config.auto.as_ref()).await
}
_ => unreachable!(),
}
@ -270,36 +270,44 @@ pub async fn cmd_admin(
}
pub async fn cmd_auto(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_admin: &Endpoint<AdminRpc, ()>,
rpc_system: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
config: Option<&AutoConfig>,
) -> Result<(), HelperError> {
match config {
Some(auto) => {
// Assign cluster layout if all nodes are unassigned.
// This is to ensure a newly created cluster is readily available.
// Further changes to the cluster layout must be done manually.
if let Some(nodes) = get_unassigned_nodes(rpc_system, rpc_host).await? {
assign_node_layout(rpc_system, rpc_host, &nodes, auto.nodes.as_ref()).await?;
}
// Import keys
for key in auto.keys.iter() {
let exists = key_exists(rpc_cli, rpc_host, key.id.clone()).await?;
let exists = key_exists(rpc_admin, rpc_host, key.id.clone()).await?;
if !exists {
key_create(rpc_cli, rpc_host, key).await?;
key_create(rpc_admin, rpc_host, key).await?;
}
}
// Import buckets
for bucket in auto.buckets.iter() {
let exists = bucket_exists(rpc_cli, rpc_host, bucket.name.clone()).await?;
let exists = bucket_exists(rpc_admin, rpc_host, bucket.name.clone()).await?;
if !exists {
bucket_create(rpc_cli, rpc_host, bucket).await?;
bucket_create(rpc_admin, rpc_host, bucket).await?;
}
// Assign permissions to keys.
for perm in bucket.allow.iter() {
grant_permission(rpc_cli, rpc_host, bucket.name.clone(), perm).await?;
grant_permission(rpc_admin, rpc_host, bucket.name.clone(), perm).await?;
}
}
}
_ => {
println!("Auto configuration is missing");
return Err(HelperError::BadRequest("Auto configuration is missing".to_string()))
}
}
Ok(())
@ -319,101 +327,3 @@ pub async fn fetch_status(
resp => Err(Error::unexpected_rpc_message(resp)),
}
}
pub async fn key_exists(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
key_pattern: String,
) -> Result<bool, Error> {
match rpc_cli
.call(&rpc_host, AdminRpc::KeyOperation(
KeyOperation::Info(KeyInfoOpt{
key_pattern,
show_secret: false,
})), PRIO_NORMAL)
.await?
{
Ok(_) => Ok(true),
Err(HelperError::BadRequest(_)) => Ok(false),
resp => Err(Error::unexpected_rpc_message(resp)),
}
}
pub async fn bucket_exists(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
name: String,
) -> Result<bool, Error> {
match rpc_cli
.call(&rpc_host, AdminRpc::BucketOperation(
BucketOperation::Info(BucketOpt{name})
), PRIO_NORMAL)
.await?
{
Ok(_) => Ok(true),
Err(HelperError::BadRequest(_)) => Ok(false),
resp => Err(Error::unexpected_rpc_message(resp)),
}
}
pub async fn key_create(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
params: &AutoKey,
) -> Result<(), Error> {
match rpc_cli
.call(&rpc_host, AdminRpc::KeyOperation(
KeyOperation::Import(KeyImportOpt{
name: params.name.clone(),
secret_key: params.secret.clone(),
key_id: params.id.clone(),
yes: true,
})
), PRIO_NORMAL).await?
{
Ok(_) => Ok(()),
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
resp => Err(Error::unexpected_rpc_message(resp))
}
}
pub async fn bucket_create(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
params: &AutoBucket,
) -> Result<(), Error> {
match rpc_cli
.call(&rpc_host, AdminRpc::BucketOperation(
BucketOperation::Create(BucketOpt{name: params.name.clone()})
), PRIO_NORMAL)
.await?
{
Ok(_) => Ok(()),
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
resp => Err(Error::unexpected_rpc_message(resp))
}
}
pub async fn grant_permission(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
bucket_name: String,
perm: &AutoPermission,
) -> Result<(), Error> {
match rpc_cli
.call(&rpc_host, AdminRpc::BucketOperation(
BucketOperation::Allow(PermBucketOpt{
key_pattern: perm.key.clone(),
read: perm.read,
write: perm.write,
owner: perm.owner,
bucket: bucket_name,
})
), PRIO_NORMAL)
.await?
{
Ok(_) => Ok(()),
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
resp => Err(Error::unexpected_rpc_message(resp))
}
}

View file

@ -3,6 +3,7 @@ pub(crate) mod init;
pub(crate) mod layout;
pub(crate) mod structs;
pub(crate) mod util;
pub(crate) mod auto;
pub(crate) mod convert_db;
@ -11,3 +12,4 @@ pub(crate) use init::*;
pub(crate) use layout::*;
pub(crate) use structs::*;
pub(crate) use util::*;
pub(crate) use auto::*;

View file

@ -208,6 +208,9 @@ pub struct AutoConfig {
/// Keys to automatically create on startup
pub keys: Vec<AutoKey>,
/// Node layout to automatically configure.
pub nodes: Vec<AutoNode>,
}
/// Key to create automatically
@ -241,6 +244,15 @@ pub struct AutoPermission {
pub owner: bool,
}
/// Node layout to create automatically
#[derive(Deserialize, Debug, Clone, Default)]
pub struct AutoNode {
/// Zone name
pub zone: String,
/// Storage capacity, in bytes (or with suffix)
pub capacity: String,
}
#[derive(Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "lowercase")]
pub enum ConsulDiscoveryAPI {