WIP: Automatically create node layout, keys and buckets #883
6 changed files with 386 additions and 1 deletions
223
src/garage/cli/auto.rs
Normal file
223
src/garage/cli/auto.rs
Normal file
|
@ -0,0 +1,223 @@
|
|||
use crate::admin::AdminRpc;
|
||||
use crate::cli::{cmd_apply_layout, cmd_assign_role, fetch_layout, fetch_status, ApplyLayoutOpt, AssignRoleOpt, BucketOperation, BucketOpt, KeyImportOpt, KeyInfoOpt, KeyOperation, PermBucketOpt, WebsiteOpt};
|
||||
use bytesize::ByteSize;
|
||||
use garage_model::helper::error::Error as HelperError;
|
||||
use garage_net::endpoint::Endpoint;
|
||||
use garage_net::message::PRIO_NORMAL;
|
||||
use garage_net::NodeID;
|
||||
use garage_rpc::layout::NodeRoleV;
|
||||
use garage_rpc::system::SystemRpc;
|
||||
use garage_util::config::{AutoBucket, AutoBucketWebsite, AutoKey, AutoNode, AutoPermission, WebsiteAllowance};
|
||||
use garage_util::data::Uuid;
|
||||
use garage_util::error::Error;
|
||||
|
||||
pub async fn key_exists(
|
||||
rpc_cli: &Endpoint<AdminRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
key_pattern: String,
|
||||
) -> Result<bool, Error> {
|
||||
match rpc_cli
|
||||
.call(
|
||||
&rpc_host,
|
||||
AdminRpc::KeyOperation(KeyOperation::Info(KeyInfoOpt {
|
||||
key_pattern,
|
||||
show_secret: false,
|
||||
})),
|
||||
PRIO_NORMAL,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Ok(_) => Ok(true),
|
||||
Err(HelperError::BadRequest(_)) => Ok(false),
|
||||
resp => Err(Error::unexpected_rpc_message(resp)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn bucket_exists(
|
||||
rpc_cli: &Endpoint<AdminRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
name: String,
|
||||
) -> Result<bool, Error> {
|
||||
match rpc_cli
|
||||
.call(
|
||||
&rpc_host,
|
||||
AdminRpc::BucketOperation(BucketOperation::Info(BucketOpt { name })),
|
||||
PRIO_NORMAL,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Ok(_) => Ok(true),
|
||||
Err(HelperError::BadRequest(_)) => Ok(false),
|
||||
resp => Err(Error::unexpected_rpc_message(resp)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn key_create(
|
||||
rpc_cli: &Endpoint<AdminRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
params: &AutoKey,
|
||||
) -> Result<(), Error> {
|
||||
match rpc_cli
|
||||
.call(
|
||||
&rpc_host,
|
||||
AdminRpc::KeyOperation(KeyOperation::Import(KeyImportOpt {
|
||||
name: params.name.clone(),
|
||||
secret_key: params.secret.clone(),
|
||||
key_id: params.id.clone(),
|
||||
yes: true,
|
||||
})),
|
||||
PRIO_NORMAL,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Ok(_) => Ok(()),
|
||||
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
|
||||
resp => Err(Error::unexpected_rpc_message(resp)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn bucket_create(
|
||||
rpc_cli: &Endpoint<AdminRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
params: &AutoBucket,
|
||||
) -> Result<(), Error> {
|
||||
match rpc_cli
|
||||
.call(
|
||||
&rpc_host,
|
||||
AdminRpc::BucketOperation(BucketOperation::Create(BucketOpt {
|
||||
name: params.name.clone(),
|
||||
})),
|
||||
PRIO_NORMAL,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Ok(_) => Ok(()),
|
||||
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
|
||||
resp => Err(Error::unexpected_rpc_message(resp)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn bucket_configure_website(
|
||||
rpc_cli: &Endpoint<AdminRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
bucket_name: String,
|
||||
website: &AutoBucketWebsite,
|
||||
) -> Result<(), Error> {
|
||||
match rpc_cli
|
||||
.call(
|
||||
&rpc_host,
|
||||
AdminRpc::BucketOperation(BucketOperation::Website(WebsiteOpt{
|
||||
allow: matches!(website.mode, WebsiteAllowance::Allow),
|
||||
deny: matches!(website.mode, WebsiteAllowance::Deny),
|
||||
bucket: bucket_name.clone(),
|
||||
index_document: website.index_document.clone(),
|
||||
error_document: website.error_document.clone(),
|
||||
})),
|
||||
PRIO_NORMAL,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Ok(_) => Ok(()),
|
||||
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
|
||||
resp => Err(Error::unexpected_rpc_message(resp)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn grant_permission(
|
||||
rpc_cli: &Endpoint<AdminRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
bucket_name: String,
|
||||
perm: &AutoPermission,
|
||||
) -> Result<(), Error> {
|
||||
match rpc_cli
|
||||
.call(
|
||||
&rpc_host,
|
||||
AdminRpc::BucketOperation(BucketOperation::Allow(PermBucketOpt {
|
||||
key_pattern: perm.key.clone(),
|
||||
read: perm.read,
|
||||
write: perm.write,
|
||||
owner: perm.owner,
|
||||
bucket: bucket_name,
|
||||
})),
|
||||
PRIO_NORMAL,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Ok(_) => Ok(()),
|
||||
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
|
||||
resp => Err(Error::unexpected_rpc_message(resp)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_unassigned_nodes(
|
||||
rpc_cli: &Endpoint<SystemRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
) -> Result<Option<Vec<Uuid>>, Error> {
|
||||
let status = fetch_status(rpc_cli, rpc_host).await?;
|
||||
let layout = fetch_layout(rpc_cli, rpc_host).await?;
|
||||
let mut nodes: Vec<Uuid> = Vec::new();
|
||||
|
||||
for adv in status.iter().filter(|adv| adv.is_up) {
|
||||
if layout.current().roles.get(&adv.id).is_none() {
|
||||
let prev_role = layout
|
||||
.versions
|
||||
.iter()
|
||||
.rev()
|
||||
.find_map(|x| match x.roles.get(&adv.id) {
|
||||
Some(NodeRoleV(Some(cfg))) => Some(cfg),
|
||||
_ => None,
|
||||
});
|
||||
if prev_role.is_none() {
|
||||
if let Some(NodeRoleV(Some(_))) = layout.staging.get().roles.get(&adv.id) {
|
||||
// Node role assignment is pending, can return immediately.
|
||||
return Ok(None);
|
||||
} else {
|
||||
nodes.push(adv.id.clone());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Node role is assigned, can return immediately.
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
// Encountered no node with an assignment (pending or applied).
|
||||
// Therefore, all nodes are unassigned.
|
||||
Ok(Some(nodes))
|
||||
}
|
||||
|
||||
pub async fn assign_node_layout(
|
||||
rpc_cli: &Endpoint<SystemRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
unassigned_nodes: &Vec<Uuid>,
|
||||
auto_nodes: &Vec<AutoNode>,
|
||||
) -> Result<(), Error> {
|
||||
if unassigned_nodes.len() != auto_nodes.len() {
|
||||
return Err(Error::Message(
|
||||
"Cannot apply auto layout: configured nodes do not match actual nodes".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
for (i, node_id) in unassigned_nodes.iter().enumerate() {
|
||||
if let Some(auto) = auto_nodes.get(i) {
|
||||
let capacity = auto.capacity.parse::<ByteSize>()?;
|
||||
cmd_assign_role(
|
||||
rpc_cli,
|
||||
rpc_host,
|
||||
AssignRoleOpt {
|
||||
node_ids: vec![format!("{id:?}", id = node_id)],
|
||||
zone: Some(auto.zone.clone()),
|
||||
capacity: Some(capacity),
|
||||
gateway: false,
|
||||
tags: vec![],
|
||||
replace: vec![],
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
cmd_apply_layout(rpc_cli, rpc_host, ApplyLayoutOpt { version: Some(1) }).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet};
|
|||
use std::time::Duration;
|
||||
|
||||
use format_table::format_table;
|
||||
use garage_util::config::{AutoConfig, Config};
|
||||
use garage_util::error::*;
|
||||
|
||||
use garage_rpc::layout::*;
|
||||
|
@ -18,6 +19,7 @@ pub async fn cli_command_dispatch(
|
|||
system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
|
||||
admin_rpc_endpoint: &Endpoint<AdminRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
config: &Config,
|
||||
) -> Result<(), HelperError> {
|
||||
match cmd {
|
||||
Command::Status => Ok(cmd_status(system_rpc_endpoint, rpc_host).await?),
|
||||
|
@ -44,6 +46,15 @@ pub async fn cli_command_dispatch(
|
|||
Command::Meta(mo) => {
|
||||
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await
|
||||
}
|
||||
Command::Auto => {
|
||||
cmd_auto(
|
||||
admin_rpc_endpoint,
|
||||
system_rpc_endpoint,
|
||||
rpc_host,
|
||||
config.auto.as_ref(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
@ -264,6 +275,56 @@ pub async fn cmd_admin(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn cmd_auto(
|
||||
rpc_admin: &Endpoint<AdminRpc, ()>,
|
||||
rpc_system: &Endpoint<SystemRpc, ()>,
|
||||
rpc_host: NodeID,
|
||||
config: Option<&AutoConfig>,
|
||||
) -> Result<(), HelperError> {
|
||||
match config {
|
||||
Some(auto) => {
|
||||
// Assign cluster layout if all nodes are unassigned.
|
||||
// This is to ensure a newly created cluster is readily available.
|
||||
// Further changes to the cluster layout must be done manually.
|
||||
if let Some(nodes) = get_unassigned_nodes(rpc_system, rpc_host).await? {
|
||||
assign_node_layout(rpc_system, rpc_host, &nodes, auto.nodes.as_ref()).await?;
|
||||
}
|
||||
|
||||
// Import keys
|
||||
for key in auto.keys.iter() {
|
||||
let exists = key_exists(rpc_admin, rpc_host, key.id.clone()).await?;
|
||||
if !exists {
|
||||
key_create(rpc_admin, rpc_host, key).await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Import buckets
|
||||
for bucket in auto.buckets.iter() {
|
||||
let exists = bucket_exists(rpc_admin, rpc_host, bucket.name.clone()).await?;
|
||||
if !exists {
|
||||
bucket_create(rpc_admin, rpc_host, bucket).await?;
|
||||
}
|
||||
|
||||
// Assign permissions to keys.
|
||||
for perm in bucket.allow.iter() {
|
||||
grant_permission(rpc_admin, rpc_host, bucket.name.clone(), perm).await?;
|
||||
}
|
||||
|
||||
// Configure website access.
|
||||
if let Some(website) = bucket.website.as_ref() {
|
||||
bucket_configure_website(rpc_admin, rpc_host, bucket.name.clone(), website).await?
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
return Err(HelperError::BadRequest(
|
||||
"Auto configuration is missing".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---- utility ----
|
||||
|
||||
pub async fn fetch_status(
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
pub(crate) mod auto;
|
||||
pub(crate) mod cmd;
|
||||
pub(crate) mod init;
|
||||
pub(crate) mod layout;
|
||||
|
@ -6,6 +7,7 @@ pub(crate) mod util;
|
|||
|
||||
pub(crate) mod convert_db;
|
||||
|
||||
pub(crate) use auto::*;
|
||||
pub(crate) use cmd::*;
|
||||
pub(crate) use init::*;
|
||||
pub(crate) use layout::*;
|
||||
|
|
|
@ -59,6 +59,9 @@ pub enum Command {
|
|||
/// Convert metadata db between database engine formats
|
||||
#[structopt(name = "convert-db", version = garage_version())]
|
||||
ConvertDb(convert_db::ConvertDbOpt),
|
||||
|
||||
/// Create preconfigured keys, buckets and node layout.
|
||||
Auto,
|
||||
}
|
||||
|
||||
#[derive(StructOpt, Debug)]
|
||||
|
|
|
@ -284,7 +284,15 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
|
|||
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
|
||||
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
|
||||
|
||||
match cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await {
|
||||
match cli_command_dispatch(
|
||||
opt.cmd,
|
||||
&system_rpc_endpoint,
|
||||
&admin_rpc_endpoint,
|
||||
id,
|
||||
config.as_ref().unwrap(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(HelperError::Internal(i)) => Err(Error::Message(format!("Internal error: {}", i))),
|
||||
Err(HelperError::BadRequest(b)) => Err(Error::Message(b)),
|
||||
Err(e) => Err(Error::Message(format!("{}", e))),
|
||||
|
|
|
@ -128,6 +128,9 @@ pub struct Config {
|
|||
/// Configuration for the admin API endpoint
|
||||
#[serde(default = "Default::default")]
|
||||
pub admin: AdminConfig,
|
||||
|
||||
/// Configuration to apply automatically
|
||||
pub auto: Option<AutoConfig>,
|
||||
}
|
||||
|
||||
/// Value for data_dir: either a single directory or a list of dirs with attributes
|
||||
|
@ -198,6 +201,91 @@ pub struct AdminConfig {
|
|||
pub trace_sink: Option<String>,
|
||||
}
|
||||
|
||||
/// Configuration to apply without manual intervention
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
pub struct AutoConfig {
|
||||
/// Buckets to create automatically
|
||||
pub buckets: Vec<AutoBucket>,
|
||||
|
||||
/// Keys to create automatically
|
||||
pub keys: Vec<AutoKey>,
|
||||
|
||||
/// Node layout to create automatically
|
||||
pub nodes: Vec<AutoNode>,
|
||||
}
|
||||
|
||||
/// Key to create automatically
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
pub struct AutoKey {
|
||||
/// Key name
|
||||
pub name: String,
|
||||
/// Key ID starting with GK
|
||||
pub id: String,
|
||||
/// Secret key
|
||||
pub secret: String,
|
||||
}
|
||||
|
||||
/// Bucket to create automatically
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
pub struct AutoBucket {
|
||||
/// Bucket name
|
||||
pub name: String,
|
||||
/// Permissions to grant on bucket to given keys
|
||||
pub allow: Vec<AutoPermission>,
|
||||
/// Website configuration
|
||||
pub website: Option<AutoBucketWebsite>
|
||||
}
|
||||
|
||||
fn default_index_document() -> String {
|
||||
"index.html".to_string()
|
||||
}
|
||||
|
||||
/// Bucket website configuration to create automatically
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
pub struct AutoBucketWebsite {
|
||||
/// Allow or deny (default) website access
|
||||
#[serde(default)]
|
||||
pub mode: WebsiteAllowance,
|
||||
/// Error document: the optional document returned when an error occurs
|
||||
pub error_document: Option<String>,
|
||||
/// Index document: the suffix appended to request paths ending by /
|
||||
#[serde(default = "default_index_document")]
|
||||
pub index_document: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum WebsiteAllowance {
|
||||
Allow,
|
||||
#[default]
|
||||
Deny,
|
||||
}
|
||||
|
||||
/// Permission to create automatically
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
pub struct AutoPermission {
|
||||
/// Key ID or name
|
||||
pub key: String,
|
||||
|
||||
/// Grant read permission
|
||||
pub read: bool,
|
||||
|
||||
/// Grant write permission
|
||||
pub write: bool,
|
||||
|
||||
/// Grant owner permission
|
||||
pub owner: bool,
|
||||
}
|
||||
|
||||
/// Node layout to create automatically
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
pub struct AutoNode {
|
||||
/// Zone name
|
||||
pub zone: String,
|
||||
/// Storage capacity, in bytes (or with suffix)
|
||||
pub capacity: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ConsulDiscoveryAPI {
|
||||
|
|
Loading…
Reference in a new issue