New model for buckets #172
17 changed files with 137 additions and 66 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -474,6 +474,7 @@ version = "0.6.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"err-derive 0.3.0",
|
||||||
"futures",
|
"futures",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"garage_model 0.5.1",
|
"garage_model 0.5.1",
|
||||||
|
|
|
@ -4,6 +4,7 @@ use err_derive::Error;
|
||||||
use hyper::header::HeaderValue;
|
use hyper::header::HeaderValue;
|
||||||
use hyper::{HeaderMap, StatusCode};
|
use hyper::{HeaderMap, StatusCode};
|
||||||
|
|
||||||
|
use garage_model::helper::error::Error as HelperError;
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
|
|
||||||
use crate::s3_xml;
|
use crate::s3_xml;
|
||||||
|
@ -83,6 +84,15 @@ impl From<quick_xml::de::DeError> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<HelperError> for Error {
|
||||||
|
fn from(err: HelperError) -> Self {
|
||||||
|
match err {
|
||||||
|
HelperError::Internal(i) => Self::InternalError(i),
|
||||||
|
HelperError::BadRequest(b) => Self::BadRequest(b),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Error {
|
impl Error {
|
||||||
/// Get the HTTP status code that best represents the meaning of the error for the client
|
/// Get the HTTP status code that best represents the meaning of the error for the client
|
||||||
pub fn http_status_code(&self) -> StatusCode {
|
pub fn http_status_code(&self) -> StatusCode {
|
||||||
|
|
|
@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use garage_util::crdt::*;
|
use garage_util::crdt::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::{Error as GarageError, OkOrMessage};
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
|
||||||
use garage_table::replication::*;
|
use garage_table::replication::*;
|
||||||
|
@ -18,6 +18,7 @@ use garage_rpc::*;
|
||||||
use garage_model::bucket_alias_table::*;
|
use garage_model::bucket_alias_table::*;
|
||||||
use garage_model::bucket_table::*;
|
use garage_model::bucket_table::*;
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
use garage_model::helper::error::{Error, OkOrBadRequest};
|
||||||
use garage_model::key_table::*;
|
use garage_model::key_table::*;
|
||||||
use garage_model::migrate::Migrate;
|
use garage_model::migrate::Migrate;
|
||||||
use garage_model::permission::*;
|
use garage_model::permission::*;
|
||||||
|
@ -91,7 +92,7 @@ impl AdminRpcHandler {
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
.resolve_global_bucket_name(&query.name)
|
.resolve_global_bucket_name(&query.name)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_message("Bucket not found")?;
|
.ok_or_bad_request("Bucket not found")?;
|
||||||
|
|
||||||
let bucket = self
|
let bucket = self
|
||||||
.garage
|
.garage
|
||||||
|
@ -137,7 +138,7 @@ impl AdminRpcHandler {
|
||||||
let alias = match self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
|
let alias = match self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
|
||||||
Some(mut alias) => {
|
Some(mut alias) => {
|
||||||
if !alias.state.get().is_deleted() {
|
if !alias.state.get().is_deleted() {
|
||||||
return Err(Error::BadRpc(format!("Bucket {} already exists", name)));
|
return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
|
||||||
}
|
}
|
||||||
alias.state.update(Deletable::Present(AliasParams {
|
alias.state.update(Deletable::Present(AliasParams {
|
||||||
bucket_id: bucket.id,
|
bucket_id: bucket.id,
|
||||||
|
@ -145,7 +146,7 @@ impl AdminRpcHandler {
|
||||||
alias
|
alias
|
||||||
}
|
}
|
||||||
None => BucketAlias::new(name.clone(), bucket.id)
|
None => BucketAlias::new(name.clone(), bucket.id)
|
||||||
.ok_or_message(format!(INVALID_BUCKET_NAME_MESSAGE!(), name))?,
|
.ok_or_bad_request(format!(INVALID_BUCKET_NAME_MESSAGE!(), name))?,
|
||||||
};
|
};
|
||||||
bucket.state.as_option_mut().unwrap().aliases.merge_raw(
|
bucket.state.as_option_mut().unwrap().aliases.merge_raw(
|
||||||
name,
|
name,
|
||||||
|
@ -164,7 +165,7 @@ impl AdminRpcHandler {
|
||||||
.get(&EmptyKey, &query.name)
|
.get(&EmptyKey, &query.name)
|
||||||
.await?
|
.await?
|
||||||
.filter(|a| !a.is_deleted())
|
.filter(|a| !a.is_deleted())
|
||||||
.ok_or_message(format!("Bucket {} does not exist", query.name))?;
|
.ok_or_bad_request(format!("Bucket {} does not exist", query.name))?;
|
||||||
|
|
||||||
let bucket_id = bucket_alias.state.get().as_option().unwrap().bucket_id;
|
let bucket_id = bucket_alias.state.get().as_option().unwrap().bucket_id;
|
||||||
|
|
||||||
|
@ -182,7 +183,7 @@ impl AdminRpcHandler {
|
||||||
.filter(|(_, _, active)| *active)
|
.filter(|(_, _, active)| *active)
|
||||||
.any(|(name, _, _)| name != &query.name)
|
.any(|(name, _, _)| name != &query.name)
|
||||||
{
|
{
|
||||||
return Err(Error::Message(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
|
return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
|
||||||
}
|
}
|
||||||
if bucket_state
|
if bucket_state
|
||||||
.local_aliases
|
.local_aliases
|
||||||
|
@ -190,7 +191,7 @@ impl AdminRpcHandler {
|
||||||
.iter()
|
.iter()
|
||||||
.any(|(_, _, active)| *active)
|
.any(|(_, _, active)| *active)
|
||||||
{
|
{
|
||||||
return Err(Error::Message(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
|
return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check bucket is empty
|
// Check bucket is empty
|
||||||
|
@ -200,11 +201,14 @@ impl AdminRpcHandler {
|
||||||
.get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10)
|
.get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10)
|
||||||
.await?;
|
.await?;
|
||||||
if !objects.is_empty() {
|
if !objects.is_empty() {
|
||||||
return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name)));
|
return Err(Error::BadRequest(format!(
|
||||||
|
"Bucket {} is not empty",
|
||||||
|
query.name
|
||||||
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if !query.yes {
|
if !query.yes {
|
||||||
return Err(Error::BadRpc(
|
return Err(Error::BadRequest(
|
||||||
"Add --yes flag to really perform this operation".to_string(),
|
"Add --yes flag to really perform this operation".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
@ -218,7 +222,7 @@ impl AdminRpcHandler {
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::Message(format!("Key not found: {}", key_id)));
|
return Err(Error::BadRequest(format!("Key not found: {}", key_id)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 2. delete bucket alias
|
// 2. delete bucket alias
|
||||||
|
@ -237,7 +241,7 @@ impl AdminRpcHandler {
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
.resolve_global_bucket_name(&query.existing_bucket)
|
.resolve_global_bucket_name(&query.existing_bucket)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_message("Bucket not found")?;
|
.ok_or_bad_request("Bucket not found")?;
|
||||||
let mut bucket = self
|
let mut bucket = self
|
||||||
.garage
|
.garage
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
|
@ -257,12 +261,12 @@ impl AdminRpcHandler {
|
||||||
query.new_name, bucket_id, key.key_id
|
query.new_name, bucket_id, key.key_id
|
||||||
)));
|
)));
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::Message(format!("Alias {} already exists and points to different bucket: {:?} in namespace of key {}", query.new_name, existing_alias, key.key_id)));
|
return Err(Error::BadRequest(format!("Alias {} already exists and points to different bucket: {:?} in namespace of key {}", query.new_name, existing_alias, key.key_id)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !is_valid_bucket_name(&query.new_name) {
|
if !is_valid_bucket_name(&query.new_name) {
|
||||||
return Err(Error::Message(format!(
|
return Err(Error::BadRequest(format!(
|
||||||
INVALID_BUCKET_NAME_MESSAGE!(),
|
INVALID_BUCKET_NAME_MESSAGE!(),
|
||||||
query.new_name
|
query.new_name
|
||||||
)));
|
)));
|
||||||
|
@ -312,7 +316,7 @@ impl AdminRpcHandler {
|
||||||
query.new_name, bucket_id
|
query.new_name, bucket_id
|
||||||
)));
|
)));
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::Message(format!(
|
return Err(Error::BadRequest(format!(
|
||||||
"Alias {} already exists and points to different bucket: {:?}",
|
"Alias {} already exists and points to different bucket: {:?}",
|
||||||
query.new_name, p.bucket_id
|
query.new_name, p.bucket_id
|
||||||
)));
|
)));
|
||||||
|
@ -330,7 +334,7 @@ impl AdminRpcHandler {
|
||||||
|
|
||||||
let alias = match alias {
|
let alias = match alias {
|
||||||
None => BucketAlias::new(query.new_name.clone(), bucket_id)
|
None => BucketAlias::new(query.new_name.clone(), bucket_id)
|
||||||
.ok_or_message(format!(INVALID_BUCKET_NAME_MESSAGE!(), query.new_name))?,
|
.ok_or_bad_request(format!(INVALID_BUCKET_NAME_MESSAGE!(), query.new_name))?,
|
||||||
Some(mut a) => {
|
Some(mut a) => {
|
||||||
a.state = Lww::raw(alias_ts, Deletable::present(AliasParams { bucket_id }));
|
a.state = Lww::raw(alias_ts, Deletable::present(AliasParams { bucket_id }));
|
||||||
a
|
a
|
||||||
|
@ -360,7 +364,7 @@ impl AdminRpcHandler {
|
||||||
.get(&query.name)
|
.get(&query.name)
|
||||||
.map(|a| a.into_option())
|
.map(|a| a.into_option())
|
||||||
.flatten()
|
.flatten()
|
||||||
.ok_or_message("Bucket not found")?;
|
.ok_or_bad_request("Bucket not found")?;
|
||||||
let mut bucket = self
|
let mut bucket = self
|
||||||
.garage
|
.garage
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
|
@ -379,7 +383,7 @@ impl AdminRpcHandler {
|
||||||
.iter()
|
.iter()
|
||||||
.any(|((k, n), _, active)| *k == key.key_id && *n == query.name && *active);
|
.any(|((k, n), _, active)| *k == key.key_id && *n == query.name && *active);
|
||||||
if !has_other_aliases {
|
if !has_other_aliases {
|
||||||
return Err(Error::Message(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", query.name)));
|
return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", query.name)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks ok, remove alias
|
// Checks ok, remove alias
|
||||||
|
@ -410,7 +414,7 @@ impl AdminRpcHandler {
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
.resolve_global_bucket_name(&query.name)
|
.resolve_global_bucket_name(&query.name)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_message("Bucket not found")?;
|
.ok_or_bad_request("Bucket not found")?;
|
||||||
let mut bucket = self
|
let mut bucket = self
|
||||||
.garage
|
.garage
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
|
@ -429,7 +433,7 @@ impl AdminRpcHandler {
|
||||||
.iter()
|
.iter()
|
||||||
.any(|(_, _, active)| *active);
|
.any(|(_, _, active)| *active);
|
||||||
if !has_other_aliases {
|
if !has_other_aliases {
|
||||||
return Err(Error::Message(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", query.name)));
|
return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", query.name)));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut alias = self
|
let mut alias = self
|
||||||
|
@ -461,7 +465,7 @@ impl AdminRpcHandler {
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
.resolve_global_bucket_name(&query.bucket)
|
.resolve_global_bucket_name(&query.bucket)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_message("Bucket not found")?;
|
.ok_or_bad_request("Bucket not found")?;
|
||||||
let bucket = self
|
let bucket = self
|
||||||
.garage
|
.garage
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
|
@ -491,7 +495,7 @@ impl AdminRpcHandler {
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
.resolve_global_bucket_name(&query.bucket)
|
.resolve_global_bucket_name(&query.bucket)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_message("Bucket not found")?;
|
.ok_or_bad_request("Bucket not found")?;
|
||||||
let bucket = self
|
let bucket = self
|
||||||
.garage
|
.garage
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
|
@ -521,7 +525,7 @@ impl AdminRpcHandler {
|
||||||
.bucket_helper()
|
.bucket_helper()
|
||||||
.resolve_global_bucket_name(&query.bucket)
|
.resolve_global_bucket_name(&query.bucket)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_message("Bucket not found")?;
|
.ok_or_bad_request("Bucket not found")?;
|
||||||
|
|
||||||
let mut bucket = self
|
let mut bucket = self
|
||||||
.garage
|
.garage
|
||||||
|
@ -531,7 +535,7 @@ impl AdminRpcHandler {
|
||||||
let bucket_state = bucket.state.as_option_mut().unwrap();
|
let bucket_state = bucket.state.as_option_mut().unwrap();
|
||||||
|
|
||||||
if !(query.allow ^ query.deny) {
|
if !(query.allow ^ query.deny) {
|
||||||
return Err(Error::Message(
|
return Err(Error::BadRequest(
|
||||||
"You must specify exactly one flag, either --allow or --deny".to_string(),
|
"You must specify exactly one flag, either --allow or --deny".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
@ -606,7 +610,7 @@ impl AdminRpcHandler {
|
||||||
async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
|
async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
|
||||||
let mut key = self.get_existing_key(&query.key_pattern).await?;
|
let mut key = self.get_existing_key(&query.key_pattern).await?;
|
||||||
if !query.yes {
|
if !query.yes {
|
||||||
return Err(Error::BadRpc(
|
return Err(Error::BadRequest(
|
||||||
"Add --yes flag to really perform this operation".to_string(),
|
"Add --yes flag to really perform this operation".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
@ -659,7 +663,7 @@ impl AdminRpcHandler {
|
||||||
async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
|
async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
|
||||||
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
|
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
|
||||||
if prev_key.is_some() {
|
if prev_key.is_some() {
|
||||||
return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
|
return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
|
||||||
}
|
}
|
||||||
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
|
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
|
||||||
self.garage.key_table.insert(&imported_key).await?;
|
self.garage.key_table.insert(&imported_key).await?;
|
||||||
|
@ -682,7 +686,7 @@ impl AdminRpcHandler {
|
||||||
.filter(|k| !k.state.is_deleted())
|
.filter(|k| !k.state.is_deleted())
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
if candidates.len() != 1 {
|
if candidates.len() != 1 {
|
||||||
Err(Error::Message(format!(
|
Err(Error::BadRequest(format!(
|
||||||
"{} matching keys",
|
"{} matching keys",
|
||||||
candidates.len()
|
candidates.len()
|
||||||
)))
|
)))
|
||||||
|
@ -760,7 +764,7 @@ impl AdminRpcHandler {
|
||||||
|
|
||||||
async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
|
async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
|
||||||
if !opt.yes {
|
if !opt.yes {
|
||||||
return Err(Error::BadRpc(
|
return Err(Error::BadRequest(
|
||||||
"Please provide the --yes flag to initiate migration operation.".to_string(),
|
"Please provide the --yes flag to initiate migration operation.".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
@ -776,7 +780,7 @@ impl AdminRpcHandler {
|
||||||
|
|
||||||
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
|
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
|
||||||
if !opt.yes {
|
if !opt.yes {
|
||||||
return Err(Error::BadRpc(
|
return Err(Error::BadRequest(
|
||||||
"Please provide the --yes flag to initiate repair operations.".to_string(),
|
"Please provide the --yes flag to initiate repair operations.".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
@ -803,7 +807,7 @@ impl AdminRpcHandler {
|
||||||
if failures.is_empty() {
|
if failures.is_empty() {
|
||||||
Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
|
Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
|
||||||
} else {
|
} else {
|
||||||
Err(Error::Message(format!(
|
Err(Error::BadRequest(format!(
|
||||||
"Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
|
"Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
|
||||||
failures
|
failures
|
||||||
)))
|
)))
|
||||||
|
@ -946,7 +950,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
|
||||||
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
|
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
|
||||||
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
|
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
|
||||||
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
|
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
|
||||||
_ => Err(Error::BadRpc("Invalid RPC".to_string())),
|
m => Err(GarageError::unexpected_rpc_message(m).into()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,8 @@ use garage_rpc::layout::*;
|
||||||
use garage_rpc::system::*;
|
use garage_rpc::system::*;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
|
||||||
|
use garage_model::helper::error::Error as HelperError;
|
||||||
|
|
||||||
use crate::admin::*;
|
use crate::admin::*;
|
||||||
use crate::cli::*;
|
use crate::cli::*;
|
||||||
|
|
||||||
|
@ -14,14 +16,14 @@ pub async fn cli_command_dispatch(
|
||||||
system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
|
system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
|
||||||
admin_rpc_endpoint: &Endpoint<AdminRpc, ()>,
|
admin_rpc_endpoint: &Endpoint<AdminRpc, ()>,
|
||||||
rpc_host: NodeID,
|
rpc_host: NodeID,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), HelperError> {
|
||||||
match cmd {
|
match cmd {
|
||||||
Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await,
|
Command::Status => Ok(cmd_status(system_rpc_endpoint, rpc_host).await?),
|
||||||
Command::Node(NodeOperation::Connect(connect_opt)) => {
|
Command::Node(NodeOperation::Connect(connect_opt)) => {
|
||||||
cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await
|
Ok(cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await?)
|
||||||
}
|
}
|
||||||
Command::Layout(layout_opt) => {
|
Command::Layout(layout_opt) => {
|
||||||
cli_layout_command_dispatch(layout_opt, system_rpc_endpoint, rpc_host).await
|
Ok(cli_layout_command_dispatch(layout_opt, system_rpc_endpoint, rpc_host).await?)
|
||||||
}
|
}
|
||||||
Command::Bucket(bo) => {
|
Command::Bucket(bo) => {
|
||||||
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
|
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
|
||||||
|
@ -149,7 +151,7 @@ pub async fn cmd_connect(
|
||||||
println!("Success.");
|
println!("Success.");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
r => Err(Error::BadRpc(format!("Unexpected response: {:?}", r))),
|
m => Err(Error::unexpected_rpc_message(m)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,7 +159,7 @@ pub async fn cmd_admin(
|
||||||
rpc_cli: &Endpoint<AdminRpc, ()>,
|
rpc_cli: &Endpoint<AdminRpc, ()>,
|
||||||
rpc_host: NodeID,
|
rpc_host: NodeID,
|
||||||
args: AdminRpc,
|
args: AdminRpc,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), HelperError> {
|
||||||
match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? {
|
match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? {
|
||||||
AdminRpc::Ok(msg) => {
|
AdminRpc::Ok(msg) => {
|
||||||
println!("{}", msg);
|
println!("{}", msg);
|
||||||
|
|
|
@ -22,6 +22,8 @@ use garage_util::error::*;
|
||||||
use garage_rpc::system::*;
|
use garage_rpc::system::*;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
|
||||||
|
use garage_model::helper::error::Error as HelperError;
|
||||||
|
|
||||||
use admin::*;
|
use admin::*;
|
||||||
use cli::*;
|
use cli::*;
|
||||||
|
|
||||||
|
@ -136,5 +138,9 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
|
||||||
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
|
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
|
||||||
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
|
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
|
||||||
|
|
||||||
cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await
|
match cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await {
|
||||||
|
Err(HelperError::Internal(i)) => Err(i),
|
||||||
|
Err(HelperError::BadRequest(b)) => Err(Error::Message(format!("bad request: {}", b))),
|
||||||
|
Ok(x) => Ok(x),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ garage_model_050 = { package = "garage_model", version = "0.5.1" }
|
||||||
|
|
||||||
async-trait = "0.1.7"
|
async-trait = "0.1.7"
|
||||||
arc-swap = "1.0"
|
arc-swap = "1.0"
|
||||||
|
err-derive = "0.3"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
|
|
|
@ -594,10 +594,8 @@ impl BlockManager {
|
||||||
need_nodes.push(*node);
|
need_nodes.push(*node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
m => {
|
||||||
return Err(Error::Message(
|
return Err(Error::unexpected_rpc_message(m));
|
||||||
"Unexpected response to NeedBlockQuery RPC".to_string(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -730,7 +728,7 @@ impl EndpointHandler<BlockRpc> for BlockManager {
|
||||||
BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await,
|
BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await,
|
||||||
BlockRpc::GetBlock(h) => self.read_block(h).await,
|
BlockRpc::GetBlock(h) => self.read_block(h).await,
|
||||||
BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
|
BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
|
||||||
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
|
m => Err(Error::unexpected_rpc_message(m)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,8 +15,8 @@ use garage_table::*;
|
||||||
use crate::block::*;
|
use crate::block::*;
|
||||||
use crate::block_ref_table::*;
|
use crate::block_ref_table::*;
|
||||||
use crate::bucket_alias_table::*;
|
use crate::bucket_alias_table::*;
|
||||||
use crate::bucket_helper::*;
|
|
||||||
use crate::bucket_table::*;
|
use crate::bucket_table::*;
|
||||||
|
use crate::helper;
|
||||||
use crate::key_table::*;
|
use crate::key_table::*;
|
||||||
use crate::object_table::*;
|
use crate::object_table::*;
|
||||||
use crate::version_table::*;
|
use crate::version_table::*;
|
||||||
|
@ -162,7 +162,7 @@ impl Garage {
|
||||||
self.block_manager.garage.swap(None);
|
self.block_manager.garage.swap(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn bucket_helper(&self) -> BucketHelper {
|
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
|
||||||
BucketHelper(self)
|
helper::bucket::BucketHelper(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
use garage_util::data::*;
|
|
||||||
use garage_util::error::*;
|
|
||||||
|
|
||||||
use garage_table::util::EmptyKey;
|
use garage_table::util::EmptyKey;
|
||||||
|
use garage_util::data::*;
|
||||||
|
|
||||||
use crate::bucket_table::Bucket;
|
use crate::bucket_table::Bucket;
|
||||||
use crate::garage::Garage;
|
use crate::garage::Garage;
|
||||||
|
use crate::helper::error::*;
|
||||||
|
|
||||||
pub struct BucketHelper<'a>(pub(crate) &'a Garage);
|
pub struct BucketHelper<'a>(pub(crate) &'a Garage);
|
||||||
|
|
||||||
|
@ -52,12 +51,6 @@ impl<'a> BucketHelper<'a> {
|
||||||
.get(&bucket_id, &EmptyKey)
|
.get(&bucket_id, &EmptyKey)
|
||||||
.await?
|
.await?
|
||||||
.filter(|b| !b.is_deleted())
|
.filter(|b| !b.is_deleted())
|
||||||
.map(Ok)
|
.ok_or_bad_request(format!("Bucket {:?} does not exist", bucket_id))
|
||||||
.unwrap_or_else(|| {
|
|
||||||
Err(Error::BadRpc(format!(
|
|
||||||
"Bucket {:?} does not exist",
|
|
||||||
bucket_id
|
|
||||||
)))
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
51
src/model/helper/error.rs
Normal file
51
src/model/helper/error.rs
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
use err_derive::Error;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use garage_util::error::Error as GarageError;
|
||||||
|
|
||||||
|
#[derive(Debug, Error, Serialize, Deserialize)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error(display = "Internal error: {}", _0)]
|
||||||
|
Internal(#[error(source)] GarageError),
|
||||||
|
|
||||||
|
#[error(display = "Bad request: {}", _0)]
|
||||||
|
BadRequest(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<netapp::error::Error> for Error {
|
||||||
|
fn from(e: netapp::error::Error) -> Self {
|
||||||
|
Error::Internal(GarageError::Netapp(e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait OkOrBadRequest {
|
||||||
|
type S;
|
||||||
|
fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<Self::S, Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, E> OkOrBadRequest for Result<T, E>
|
||||||
|
where
|
||||||
|
E: std::fmt::Display,
|
||||||
|
{
|
||||||
|
type S = T;
|
||||||
|
fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
|
||||||
|
match self {
|
||||||
|
Ok(x) => Ok(x),
|
||||||
|
Err(e) => Err(Error::BadRequest(format!(
|
||||||
|
"{}: {}",
|
||||||
|
reason.as_ref(),
|
||||||
|
e.to_string()
|
||||||
|
))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> OkOrBadRequest for Option<T> {
|
||||||
|
type S = T;
|
||||||
|
fn ok_or_bad_request<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
|
||||||
|
match self {
|
||||||
|
Some(x) => Ok(x),
|
||||||
|
None => Err(Error::BadRequest(reason.as_ref().to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
2
src/model/helper/mod.rs
Normal file
2
src/model/helper/mod.rs
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
pub mod bucket;
|
||||||
|
pub mod error;
|
|
@ -12,6 +12,6 @@ pub mod version_table;
|
||||||
|
|
||||||
pub mod block;
|
pub mod block;
|
||||||
|
|
||||||
pub mod bucket_helper;
|
|
||||||
pub mod garage;
|
pub mod garage;
|
||||||
|
pub mod helper;
|
||||||
pub mod migrate;
|
pub mod migrate;
|
||||||
|
|
|
@ -576,7 +576,7 @@ impl EndpointHandler<SystemRpc> for System {
|
||||||
self.clone().handle_advertise_cluster_layout(adv).await
|
self.clone().handle_advertise_cluster_layout(adv).await
|
||||||
}
|
}
|
||||||
SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
|
SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
|
||||||
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
|
m => Err(Error::unexpected_rpc_message(m)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -315,7 +315,7 @@ where
|
||||||
}
|
}
|
||||||
Ok(GcRpc::Ok)
|
Ok(GcRpc::Ok)
|
||||||
}
|
}
|
||||||
_ => Err(Error::Message("Unexpected GC RPC".to_string())),
|
m => Err(Error::unexpected_rpc_message(m)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -514,10 +514,7 @@ where
|
||||||
if let SyncRpc::Ok = rpc_resp {
|
if let SyncRpc::Ok = rpc_resp {
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err(Error::Message(format!(
|
Err(Error::unexpected_rpc_message(rpc_resp))
|
||||||
"Unexpected response to RPC Update: {}",
|
|
||||||
debug_serialize(&rpc_resp)
|
|
||||||
)))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -545,7 +542,7 @@ where
|
||||||
self.data.update_many(items)?;
|
self.data.update_many(items)?;
|
||||||
Ok(SyncRpc::Ok)
|
Ok(SyncRpc::Ok)
|
||||||
}
|
}
|
||||||
_ => Err(Error::Message("Unexpected sync RPC".to_string())),
|
m => Err(Error::unexpected_rpc_message(m)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -311,7 +311,7 @@ where
|
||||||
self.data.update_many(pairs)?;
|
self.data.update_many(pairs)?;
|
||||||
Ok(TableRpc::Ok)
|
Ok(TableRpc::Ok)
|
||||||
}
|
}
|
||||||
_ => Err(Error::BadRpc("Unexpected table RPC".to_string())),
|
m => Err(Error::unexpected_rpc_message(m)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,8 +59,8 @@ pub enum Error {
|
||||||
)]
|
)]
|
||||||
Quorum(usize, usize, usize, Vec<String>),
|
Quorum(usize, usize, usize, Vec<String>),
|
||||||
|
|
||||||
#[error(display = "Bad RPC: {}", _0)]
|
#[error(display = "Unexpected RPC message: {}", _0)]
|
||||||
BadRpc(String),
|
UnexpectedRpcMessage(String),
|
||||||
|
|
||||||
#[error(display = "Corrupt data: does not match hash {:?}", _0)]
|
#[error(display = "Corrupt data: does not match hash {:?}", _0)]
|
||||||
CorruptData(Hash),
|
CorruptData(Hash),
|
||||||
|
@ -69,6 +69,12 @@ pub enum Error {
|
||||||
Message(String),
|
Message(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Error {
|
||||||
|
pub fn unexpected_rpc_message<T: Serialize>(v: T) -> Self {
|
||||||
|
Self::UnexpectedRpcMessage(debug_serialize(&v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<sled::transaction::TransactionError<Error>> for Error {
|
impl From<sled::transaction::TransactionError<Error>> for Error {
|
||||||
fn from(e: sled::transaction::TransactionError<Error>) -> Error {
|
fn from(e: sled::transaction::TransactionError<Error>) -> Error {
|
||||||
match e {
|
match e {
|
||||||
|
|
Loading…
Reference in a new issue