WIP: Automatically create node layout, keys and buckets #883

Draft
apapsch wants to merge 6 commits from apapsch/garage:feature/auto-config into main
8 changed files with 214 additions and 175 deletions
Showing only changes of commit b185ec2f85 - Show all commits

View file

@ -279,7 +279,8 @@ impl DataLayout {
u16::from_be_bytes([ u16::from_be_bytes([
hash.as_slice()[HASH_DRIVE_BYTES.0], hash.as_slice()[HASH_DRIVE_BYTES.0],
hash.as_slice()[HASH_DRIVE_BYTES.1], hash.as_slice()[HASH_DRIVE_BYTES.1],
]) as usize % DRIVE_NPART ]) as usize
% DRIVE_NPART
} }
fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf {

View file

@ -1,5 +1,8 @@
use crate::admin::AdminRpc; use crate::admin::AdminRpc;
use crate::cli::{cmd_apply_layout, cmd_assign_role, fetch_layout, fetch_status, ApplyLayoutOpt, AssignRoleOpt, BucketOperation, BucketOpt, KeyImportOpt, KeyInfoOpt, KeyOperation, PermBucketOpt}; use crate::cli::{
cmd_apply_layout, cmd_assign_role, fetch_layout, fetch_status, ApplyLayoutOpt, AssignRoleOpt,
BucketOperation, BucketOpt, KeyImportOpt, KeyInfoOpt, KeyOperation, PermBucketOpt,
};
use bytesize::ByteSize; use bytesize::ByteSize;
use garage_model::helper::error::Error as HelperError; use garage_model::helper::error::Error as HelperError;
use garage_net::endpoint::Endpoint; use garage_net::endpoint::Endpoint;
@ -12,167 +15,186 @@ use garage_util::data::Uuid;
use garage_util::error::Error; use garage_util::error::Error;
pub async fn key_exists( pub async fn key_exists(
rpc_cli: &Endpoint<AdminRpc, ()>, rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID, rpc_host: NodeID,
key_pattern: String, key_pattern: String,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
match rpc_cli match rpc_cli
.call(&rpc_host, AdminRpc::KeyOperation( .call(
KeyOperation::Info(KeyInfoOpt{ &rpc_host,
key_pattern, AdminRpc::KeyOperation(KeyOperation::Info(KeyInfoOpt {
show_secret: false, key_pattern,
})), PRIO_NORMAL) show_secret: false,
.await? })),
{ PRIO_NORMAL,
Ok(_) => Ok(true), )
Err(HelperError::BadRequest(_)) => Ok(false), .await?
resp => Err(Error::unexpected_rpc_message(resp)), {
} Ok(_) => Ok(true),
Err(HelperError::BadRequest(_)) => Ok(false),
resp => Err(Error::unexpected_rpc_message(resp)),
}
} }
pub async fn bucket_exists( pub async fn bucket_exists(
rpc_cli: &Endpoint<AdminRpc, ()>, rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID, rpc_host: NodeID,
name: String, name: String,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
match rpc_cli match rpc_cli
.call(&rpc_host, AdminRpc::BucketOperation( .call(
BucketOperation::Info(BucketOpt{name}) &rpc_host,
), PRIO_NORMAL) AdminRpc::BucketOperation(BucketOperation::Info(BucketOpt { name })),
.await? PRIO_NORMAL,
{ )
Ok(_) => Ok(true), .await?
Err(HelperError::BadRequest(_)) => Ok(false), {
resp => Err(Error::unexpected_rpc_message(resp)), Ok(_) => Ok(true),
} Err(HelperError::BadRequest(_)) => Ok(false),
resp => Err(Error::unexpected_rpc_message(resp)),
}
} }
pub async fn key_create( pub async fn key_create(
rpc_cli: &Endpoint<AdminRpc, ()>, rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID, rpc_host: NodeID,
params: &AutoKey, params: &AutoKey,
) -> Result<(), Error> { ) -> Result<(), Error> {
match rpc_cli match rpc_cli
.call(&rpc_host, AdminRpc::KeyOperation( .call(
KeyOperation::Import(KeyImportOpt{ &rpc_host,
name: params.name.clone(), AdminRpc::KeyOperation(KeyOperation::Import(KeyImportOpt {
secret_key: params.secret.clone(), name: params.name.clone(),
key_id: params.id.clone(), secret_key: params.secret.clone(),
yes: true, key_id: params.id.clone(),
}) yes: true,
), PRIO_NORMAL).await? })),
{ PRIO_NORMAL,
Ok(_) => Ok(()), )
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)), .await?
resp => Err(Error::unexpected_rpc_message(resp)) {
} Ok(_) => Ok(()),
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
resp => Err(Error::unexpected_rpc_message(resp)),
}
} }
pub async fn bucket_create( pub async fn bucket_create(
rpc_cli: &Endpoint<AdminRpc, ()>, rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID, rpc_host: NodeID,
params: &AutoBucket, params: &AutoBucket,
) -> Result<(), Error> { ) -> Result<(), Error> {
match rpc_cli match rpc_cli
.call(&rpc_host, AdminRpc::BucketOperation( .call(
BucketOperation::Create(BucketOpt{name: params.name.clone()}) &rpc_host,
), PRIO_NORMAL) AdminRpc::BucketOperation(BucketOperation::Create(BucketOpt {
.await? name: params.name.clone(),
{ })),
Ok(_) => Ok(()), PRIO_NORMAL,
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)), )
resp => Err(Error::unexpected_rpc_message(resp)) .await?
} {
Ok(_) => Ok(()),
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
resp => Err(Error::unexpected_rpc_message(resp)),
}
} }
pub async fn grant_permission( pub async fn grant_permission(
rpc_cli: &Endpoint<AdminRpc, ()>, rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID, rpc_host: NodeID,
bucket_name: String, bucket_name: String,
perm: &AutoPermission, perm: &AutoPermission,
) -> Result<(), Error> { ) -> Result<(), Error> {
match rpc_cli match rpc_cli
.call(&rpc_host, AdminRpc::BucketOperation( .call(
BucketOperation::Allow(PermBucketOpt{ &rpc_host,
key_pattern: perm.key.clone(), AdminRpc::BucketOperation(BucketOperation::Allow(PermBucketOpt {
read: perm.read, key_pattern: perm.key.clone(),
write: perm.write, read: perm.read,
owner: perm.owner, write: perm.write,
bucket: bucket_name, owner: perm.owner,
}) bucket: bucket_name,
), PRIO_NORMAL) })),
.await? PRIO_NORMAL,
{ )
Ok(_) => Ok(()), .await?
Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)), {
resp => Err(Error::unexpected_rpc_message(resp)) Ok(_) => Ok(()),
} Err(HelperError::BadRequest(msg)) => Err(Error::Message(msg)),
resp => Err(Error::unexpected_rpc_message(resp)),
}
} }
pub async fn get_unassigned_nodes( pub async fn get_unassigned_nodes(
rpc_cli: &Endpoint<SystemRpc, ()>, rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID, rpc_host: NodeID,
) -> Result<Option<Vec<Uuid>>, Error> { ) -> Result<Option<Vec<Uuid>>, Error> {
let status = fetch_status(rpc_cli, rpc_host).await?; let status = fetch_status(rpc_cli, rpc_host).await?;
let layout = fetch_layout(rpc_cli, rpc_host).await?; let layout = fetch_layout(rpc_cli, rpc_host).await?;
let mut nodes: Vec<Uuid> = Vec::new(); let mut nodes: Vec<Uuid> = Vec::new();
for adv in status.iter().filter(|adv| adv.is_up) { for adv in status.iter().filter(|adv| adv.is_up) {
if layout.current().roles.get(&adv.id).is_none() { if layout.current().roles.get(&adv.id).is_none() {
let prev_role = layout let prev_role = layout
.versions .versions
.iter() .iter()
.rev() .rev()
.find_map(|x| match x.roles.get(&adv.id) { .find_map(|x| match x.roles.get(&adv.id) {
Some(NodeRoleV(Some(cfg))) => Some(cfg), Some(NodeRoleV(Some(cfg))) => Some(cfg),
_ => None, _ => None,
}); });
if prev_role.is_none() { if prev_role.is_none() {
if let Some(NodeRoleV(Some(_))) = layout.staging.get().roles.get(&adv.id) { if let Some(NodeRoleV(Some(_))) = layout.staging.get().roles.get(&adv.id) {
// Node role assignment is pending, can return immediately. // Node role assignment is pending, can return immediately.
return Ok(None); return Ok(None);
} else { } else {
nodes.push(adv.id.clone()); nodes.push(adv.id.clone());
} }
} }
} else { } else {
// Node role is assigned, can return immediately. // Node role is assigned, can return immediately.
return Ok(None); return Ok(None);
} }
} }
// Encountered no node with an assignment (pending or applied). // Encountered no node with an assignment (pending or applied).
// Therefore, all nodes are unassigned. // Therefore, all nodes are unassigned.
Ok(Some(nodes)) Ok(Some(nodes))
} }
pub async fn assign_node_layout( pub async fn assign_node_layout(
rpc_cli: &Endpoint<SystemRpc, ()>, rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID, rpc_host: NodeID,
unassigned_nodes: &Vec<Uuid>, unassigned_nodes: &Vec<Uuid>,
auto_nodes: &Vec<AutoNode>, auto_nodes: &Vec<AutoNode>,
) -> Result<(), Error> { ) -> Result<(), Error> {
if unassigned_nodes.len() != auto_nodes.len() { if unassigned_nodes.len() != auto_nodes.len() {
return Err(Error::Message("Cannot apply auto layout: configured nodes do not match actual nodes".to_string())); return Err(Error::Message(
} "Cannot apply auto layout: configured nodes do not match actual nodes".to_string(),
));
}
for (i, node_id) in unassigned_nodes.iter().enumerate() { for (i, node_id) in unassigned_nodes.iter().enumerate() {
if let Some(auto) = auto_nodes.get(i) { if let Some(auto) = auto_nodes.get(i) {
let capacity = auto.capacity.parse::<ByteSize>()?; let capacity = auto.capacity.parse::<ByteSize>()?;
cmd_assign_role(rpc_cli, rpc_host, AssignRoleOpt{ cmd_assign_role(
node_ids: vec![format!("{id:?}", id=node_id)], rpc_cli,
zone: Some(auto.zone.clone()), rpc_host,
capacity: Some(capacity), AssignRoleOpt {
gateway: false, node_ids: vec![format!("{id:?}", id = node_id)],
tags: vec![], zone: Some(auto.zone.clone()),
replace: vec![], capacity: Some(capacity),
}).await?; gateway: false,
} tags: vec![],
} replace: vec![],
},
)
.await?;
}
}
cmd_apply_layout(rpc_cli, rpc_host, ApplyLayoutOpt{ cmd_apply_layout(rpc_cli, rpc_host, ApplyLayoutOpt { version: Some(1) }).await?;
version: Some(1),
}).await?;
Ok(()) Ok(())
} }

View file

@ -19,7 +19,7 @@ pub async fn cli_command_dispatch(
system_rpc_endpoint: &Endpoint<SystemRpc, ()>, system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
admin_rpc_endpoint: &Endpoint<AdminRpc, ()>, admin_rpc_endpoint: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID, rpc_host: NodeID,
config: &Config, config: &Config,
) -> Result<(), HelperError> { ) -> Result<(), HelperError> {
match cmd { match cmd {
Command::Status => Ok(cmd_status(system_rpc_endpoint, rpc_host).await?), Command::Status => Ok(cmd_status(system_rpc_endpoint, rpc_host).await?),
@ -46,9 +46,15 @@ pub async fn cli_command_dispatch(
Command::Meta(mo) => { Command::Meta(mo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await
} }
Command::Auto => { Command::Auto => {
cmd_auto(admin_rpc_endpoint, system_rpc_endpoint, rpc_host, config.auto.as_ref()).await cmd_auto(
} admin_rpc_endpoint,
system_rpc_endpoint,
rpc_host,
config.auto.as_ref(),
)
.await
}
_ => unreachable!(), _ => unreachable!(),
} }
} }
@ -275,9 +281,8 @@ pub async fn cmd_auto(
rpc_host: NodeID, rpc_host: NodeID,
config: Option<&AutoConfig>, config: Option<&AutoConfig>,
) -> Result<(), HelperError> { ) -> Result<(), HelperError> {
match config { match config {
Some(auto) => { Some(auto) => {
// Assign cluster layout if all nodes are unassigned. // Assign cluster layout if all nodes are unassigned.
// This is to ensure a newly created cluster is readily available. // This is to ensure a newly created cluster is readily available.
// Further changes to the cluster layout must be done manually. // Further changes to the cluster layout must be done manually.
@ -286,12 +291,12 @@ pub async fn cmd_auto(
} }
// Import keys // Import keys
for key in auto.keys.iter() { for key in auto.keys.iter() {
let exists = key_exists(rpc_admin, rpc_host, key.id.clone()).await?; let exists = key_exists(rpc_admin, rpc_host, key.id.clone()).await?;
if !exists { if !exists {
key_create(rpc_admin, rpc_host, key).await?; key_create(rpc_admin, rpc_host, key).await?;
} }
} }
// Import buckets // Import buckets
for bucket in auto.buckets.iter() { for bucket in auto.buckets.iter() {
@ -305,12 +310,14 @@ pub async fn cmd_auto(
grant_permission(rpc_admin, rpc_host, bucket.name.clone(), perm).await?; grant_permission(rpc_admin, rpc_host, bucket.name.clone(), perm).await?;
} }
} }
} }
_ => { _ => {
return Err(HelperError::BadRequest("Auto configuration is missing".to_string())) return Err(HelperError::BadRequest(
} "Auto configuration is missing".to_string(),
} ))
Ok(()) }
}
Ok(())
} }
// ---- utility ---- // ---- utility ----
@ -326,4 +333,4 @@ pub async fn fetch_status(
SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes), SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
resp => Err(Error::unexpected_rpc_message(resp)), resp => Err(Error::unexpected_rpc_message(resp)),
} }
} }

View file

@ -1,15 +1,15 @@
pub(crate) mod auto;
pub(crate) mod cmd; pub(crate) mod cmd;
pub(crate) mod init; pub(crate) mod init;
pub(crate) mod layout; pub(crate) mod layout;
pub(crate) mod structs; pub(crate) mod structs;
pub(crate) mod util; pub(crate) mod util;
pub(crate) mod auto;
pub(crate) mod convert_db; pub(crate) mod convert_db;
pub(crate) use auto::*;
pub(crate) use cmd::*; pub(crate) use cmd::*;
pub(crate) use init::*; pub(crate) use init::*;
pub(crate) use layout::*; pub(crate) use layout::*;
pub(crate) use structs::*; pub(crate) use structs::*;
pub(crate) use util::*; pub(crate) use util::*;
pub(crate) use auto::*;

View file

@ -60,8 +60,8 @@ pub enum Command {
#[structopt(name = "convert-db", version = garage_version())] #[structopt(name = "convert-db", version = garage_version())]
ConvertDb(convert_db::ConvertDbOpt), ConvertDb(convert_db::ConvertDbOpt),
/// Create preconfigured keys, buckets and node layout. /// Create preconfigured keys, buckets and node layout.
Auto, Auto,
} }
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]

View file

@ -284,7 +284,15 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into()); let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into()); let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
match cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id, config.as_ref().unwrap()).await { match cli_command_dispatch(
opt.cmd,
&system_rpc_endpoint,
&admin_rpc_endpoint,
id,
config.as_ref().unwrap(),
)
.await
{
Err(HelperError::Internal(i)) => Err(Error::Message(format!("Internal error: {}", i))), Err(HelperError::Internal(i)) => Err(Error::Message(format!("Internal error: {}", i))),
Err(HelperError::BadRequest(b)) => Err(Error::Message(b)), Err(HelperError::BadRequest(b)) => Err(Error::Message(b)),
Err(e) => Err(Error::Message(format!("{}", e))), Err(e) => Err(Error::Message(format!("{}", e))),

View file

@ -279,7 +279,8 @@ impl<'a> LockedHelper<'a> {
.local_aliases .local_aliases
.get(alias_name) .get(alias_name)
.cloned() .cloned()
.flatten() != Some(bucket_id) .flatten()
!= Some(bucket_id)
{ {
return Err(GarageError::Message(format!( return Err(GarageError::Message(format!(
"Bucket {:?} does not have alias {} in namespace of key {}", "Bucket {:?} does not have alias {} in namespace of key {}",

View file

@ -129,8 +129,8 @@ pub struct Config {
#[serde(default = "Default::default")] #[serde(default = "Default::default")]
pub admin: AdminConfig, pub admin: AdminConfig,
/// Configuration to apply automatically /// Configuration to apply automatically
pub auto: Option<AutoConfig>, pub auto: Option<AutoConfig>,
} }
/// Value for data_dir: either a single directory or a list of dirs with attributes /// Value for data_dir: either a single directory or a list of dirs with attributes
@ -205,10 +205,10 @@ pub struct AdminConfig {
#[derive(Deserialize, Debug, Clone, Default)] #[derive(Deserialize, Debug, Clone, Default)]
pub struct AutoConfig { pub struct AutoConfig {
/// Buckets to create automatically /// Buckets to create automatically
pub buckets: Vec<AutoBucket>, pub buckets: Vec<AutoBucket>,
/// Keys to create automatically /// Keys to create automatically
pub keys: Vec<AutoKey>, pub keys: Vec<AutoKey>,
/// Node layout to create automatically /// Node layout to create automatically
pub nodes: Vec<AutoNode>, pub nodes: Vec<AutoNode>,
@ -218,27 +218,27 @@ pub struct AutoConfig {
#[derive(Deserialize, Debug, Clone, Default)] #[derive(Deserialize, Debug, Clone, Default)]
pub struct AutoKey { pub struct AutoKey {
/// Key name /// Key name
pub name: String, pub name: String,
/// Key ID starting with GK /// Key ID starting with GK
pub id: String, pub id: String,
/// Secret key /// Secret key
pub secret: String, pub secret: String,
} }
/// Bucket to create automatically /// Bucket to create automatically
#[derive(Deserialize, Debug, Clone, Default)] #[derive(Deserialize, Debug, Clone, Default)]
pub struct AutoBucket { pub struct AutoBucket {
/// Bucket name /// Bucket name
pub name: String, pub name: String,
/// Permissions to grant on bucket to given keys /// Permissions to grant on bucket to given keys
pub allow: Vec<AutoPermission>, pub allow: Vec<AutoPermission>,
} }
/// Permission to create automatically /// Permission to create automatically
#[derive(Deserialize, Debug, Clone, Default)] #[derive(Deserialize, Debug, Clone, Default)]
pub struct AutoPermission { pub struct AutoPermission {
/// Key ID or name /// Key ID or name
pub key: String, pub key: String,
/// Grant read permission /// Grant read permission
pub read: bool, pub read: bool,