From a50fa70d45f8b5af68d23d60c3bac2af4ecceb58 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sun, 8 Nov 2020 15:04:30 +0100 Subject: [PATCH 1/4] Refactor error management in API part --- src/api/Cargo.toml | 1 + src/api/api_server.rs | 19 +++---- src/api/error.rs | 116 ++++++++++++++++++++++++++++++++++++++++ src/api/lib.rs | 2 + src/api/s3_copy.rs | 3 +- src/api/s3_delete.rs | 5 +- src/api/s3_get.rs | 15 +++--- src/api/s3_put.rs | 32 ++++++----- src/api/signature.rs | 60 ++++++++------------- src/garage/admin_rpc.rs | 19 +++---- src/model/block.rs | 2 +- src/rpc/membership.rs | 2 +- src/rpc/rpc_server.rs | 5 +- src/table/table.rs | 2 +- src/table/table_sync.rs | 4 +- src/util/error.rs | 37 +------------ 16 files changed, 193 insertions(+), 131 deletions(-) create mode 100644 src/api/error.rs diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 4e0599d5..578cb9d5 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -17,6 +17,7 @@ garage_util = { version = "0.1", path = "../util" } garage_table = { version = "0.1.1", path = "../table" } garage_model = { version = "0.1.1", path = "../model" } +err-derive = "0.2.3" bytes = "0.4" hex = "0.3" log = "0.4" diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 9dc74dac..ec02572d 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -7,10 +7,11 @@ use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server}; -use garage_util::error::Error; +use garage_util::error::Error as GarageError; use garage_model::garage::Garage; +use crate::error::*; use crate::signature::check_signature; use crate::s3_copy::*; @@ -22,14 +23,14 @@ use crate::s3_put::*; pub async fn run_api_server( garage: Arc, shutdown_signal: impl Future, -) -> Result<(), Error> { +) -> Result<(), GarageError> { let addr = &garage.config.s3_api.api_bind_addr; let service = make_service_fn(|conn: &AddrStream| { let garage = garage.clone(); let client_addr = conn.remote_addr(); async move { - Ok::<_, Error>(service_fn(move |req: Request| { + Ok::<_, GarageError>(service_fn(move |req: Request| { let garage = garage.clone(); handler(garage, req, client_addr) })) @@ -49,7 +50,7 @@ async fn handler( garage: Arc, req: Request, addr: SocketAddr, -) -> Result, Error> { +) -> Result, GarageError> { info!("{} {} {}", addr, req.method(), req.uri()); debug!("{:?}", req); match handler_inner(garage, req).await { @@ -131,10 +132,7 @@ async fn handler_inner(garage: Arc, req: Request) -> Result return Err(Error::BadRequest(format!("No source key specified"))), - Some(x) => x, - }; + let source_key = source_key.ok_or_bad_request("No source key specified")?; Ok(handle_copy(garage, &bucket, &key, &source_bucket, &source_key).await?) } else { // PutObject query @@ -205,9 +203,8 @@ async fn handler_inner(garage: Arc, req: Request) -> Result().map_err(|e| { - Error::BadRequest(format!("Invalid value for max-keys: {}", e)) - }) + x.parse::() + .ok_or_bad_request("Invalid value for max-keys") }) .unwrap_or(Ok(1000))?; let prefix = params.get("prefix").map(|x| x.as_str()).unwrap_or(&""); diff --git a/src/api/error.rs b/src/api/error.rs new file mode 100644 index 00000000..ddb021db --- /dev/null +++ b/src/api/error.rs @@ -0,0 +1,116 @@ +use err_derive::Error; +use hyper::StatusCode; + +use garage_util::error::Error as GarageError; + +#[derive(Debug, Error)] +pub enum Error { + // Category: internal error + #[error(display = "Internal error: {}", _0)] + InternalError(#[error(source)] GarageError), + + #[error(display = "Internal error (Hyper error): {}", _0)] + Hyper(#[error(source)] hyper::Error), + + #[error(display = "Internal error (HTTP error): {}", _0)] + HTTP(#[error(source)] http::Error), + + // Category: cannot process + #[error(display = "Forbidden: {}", _0)] + Forbidden(String), + + #[error(display = "Not found")] + NotFound, + + // Category: bad request + #[error(display = "Invalid UTF-8: {}", _0)] + InvalidUTF8(#[error(source)] std::str::Utf8Error), + + #[error(display = "Invalid XML: {}", _0)] + InvalidXML(#[error(source)] roxmltree::Error), + + #[error(display = "Invalid header value: {}", _0)] + InvalidHeader(#[error(source)] hyper::header::ToStrError), + + #[error(display = "Invalid HTTP range: {:?}", _0)] + InvalidRange(#[error(from)] http_range::HttpRangeParseError), + + #[error(display = "Bad request: {}", _0)] + BadRequest(String), +} + +impl Error { + pub fn http_status_code(&self) -> StatusCode { + match self { + Error::NotFound => StatusCode::NOT_FOUND, + Error::Forbidden(_) => StatusCode::FORBIDDEN, + Error::InternalError(GarageError::RPC(_)) => StatusCode::SERVICE_UNAVAILABLE, + Error::InternalError(_) | Error::Hyper(_) | Error::HTTP(_) => { + StatusCode::INTERNAL_SERVER_ERROR + } + _ => StatusCode::BAD_REQUEST, + } + } +} + +pub trait OkOrBadRequest { + type S2; + fn ok_or_bad_request(self, reason: &'static str) -> Self::S2; +} + +impl OkOrBadRequest for Result +where + E: std::fmt::Display, +{ + type S2 = Result; + fn ok_or_bad_request(self, reason: &'static str) -> Result { + match self { + Ok(x) => Ok(x), + Err(e) => Err(Error::BadRequest(format!("{}: {}", reason, e))), + } + } +} + +impl OkOrBadRequest for Option { + type S2 = Result; + fn ok_or_bad_request(self, reason: &'static str) -> Result { + match self { + Some(x) => Ok(x), + None => Err(Error::BadRequest(format!("{}", reason))), + } + } +} + +pub trait OkOrInternalError { + type S2; + fn ok_or_internal_error(self, reason: &'static str) -> Self::S2; +} + +impl OkOrInternalError for Result +where + E: std::fmt::Display, +{ + type S2 = Result; + fn ok_or_internal_error(self, reason: &'static str) -> Result { + match self { + Ok(x) => Ok(x), + Err(e) => Err(Error::InternalError(GarageError::Message(format!( + "{}: {}", + reason, e + )))), + } + } +} + +impl OkOrInternalError for Option { + type S2 = Result; + fn ok_or_internal_error(self, reason: &'static str) -> Result { + match self { + Some(x) => Ok(x), + None => Err(Error::InternalError(GarageError::Message(format!( + "{}", + reason + )))), + } + } +} diff --git a/src/api/lib.rs b/src/api/lib.rs index df2fd045..9bb07925 100644 --- a/src/api/lib.rs +++ b/src/api/lib.rs @@ -1,6 +1,8 @@ #[macro_use] extern crate log; +pub mod error; + pub mod encoding; pub mod api_server; diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index db790d95..4280f4bf 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -6,13 +6,14 @@ use hyper::{Body, Response}; use garage_table::*; use garage_util::data::*; -use garage_util::error::Error; use garage_model::block_ref_table::*; use garage_model::garage::Garage; use garage_model::object_table::*; use garage_model::version_table::*; +use crate::error::*; + pub async fn handle_copy( garage: Arc, dest_bucket: &str, diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 42216f51..33e47c17 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -4,12 +4,12 @@ use std::sync::Arc; use hyper::{Body, Request, Response}; use garage_util::data::*; -use garage_util::error::Error; use garage_model::garage::Garage; use garage_model::object_table::*; use crate::encoding::*; +use crate::error::*; async fn handle_delete_internal( garage: &Garage, @@ -85,8 +85,7 @@ pub async fn handle_delete_objects( ) -> Result, Error> { let body = hyper::body::to_bytes(req.into_body()).await?; let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?; - let cmd = parse_delete_objects_xml(&cmd_xml) - .map_err(|e| Error::BadRequest(format!("Invald delete XML query: {}", e)))?; + let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?; let mut retxml = String::new(); writeln!(&mut retxml, r#""#).unwrap(); diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 68558dbe..71c656f2 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -5,13 +5,13 @@ use futures::stream::*; use hyper::body::Bytes; use hyper::{Body, Request, Response, StatusCode}; -use garage_util::error::Error; - use garage_table::EmptyKey; use garage_model::garage::Garage; use garage_model::object_table::*; +use crate::error::*; + fn object_headers( version: &ObjectVersion, version_meta: &ObjectVersionMeta, @@ -111,11 +111,8 @@ pub async fn handle_get( let range = match req.headers().get("range") { Some(range) => { - let range_str = range - .to_str() - .map_err(|e| Error::BadRequest(format!("Invalid range header: {}", e)))?; - let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size) - .map_err(|_e| Error::BadRequest(format!("Invalid range")))?; + let range_str = range.to_str()?; + let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size)?; if ranges.len() > 1 { return Err(Error::BadRequest(format!("Multiple ranges not supported"))); } else { @@ -210,7 +207,9 @@ pub async fn handle_get_range( let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec()); Ok(resp_builder.body(body)?) } else { - Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been"))) + None.ok_or_internal_error( + "Requested range not present in inline bytes when it should have been", + ) } } ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 0926ba89..ea09524c 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -9,8 +9,9 @@ use sha2::{Digest as Sha256Digest, Sha256}; use garage_table::*; use garage_util::data::*; -use garage_util::error::Error; +use garage_util::error::Error as GarageError; +use crate::error::*; use garage_model::block::INLINE_THRESHOLD; use garage_model::block_ref_table::*; use garage_model::garage::Garage; @@ -85,7 +86,7 @@ pub async fn handle_put( // Validate MD5 sum against content-md5 header and sha256sum against signed content-sha256 if let Some(expected_sha256) = content_sha256 { if expected_sha256 != sha256sum { - return Err(Error::Message(format!( + return Err(Error::BadRequest(format!( "Unable to validate x-amz-content-sha256" ))); } else { @@ -94,7 +95,7 @@ pub async fn handle_put( } if let Some(expected_md5) = content_md5 { if expected_md5.trim_matches('"') != md5sum { - return Err(Error::Message(format!("Unable to validate content-md5"))); + return Err(Error::BadRequest(format!("Unable to validate content-md5"))); } else { trace!("Successfully validated content-md5"); } @@ -184,7 +185,7 @@ async fn put_block_meta( offset: u64, hash: Hash, size: u64, -) -> Result<(), Error> { +) -> Result<(), GarageError> { // TODO: don't clone, restart from empty block list ?? let mut version = version.clone(); version @@ -225,7 +226,7 @@ impl BodyChunker { buf: VecDeque::new(), } } - async fn next(&mut self) -> Result>, Error> { + async fn next(&mut self) -> Result>, GarageError> { while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.body.next().await { let bytes = block?; @@ -305,10 +306,9 @@ pub async fn handle_put_part( // Check parameters let part_number = part_number_str .parse::() - .map_err(|e| Error::BadRequest(format!("Invalid part number: {}", e)))?; + .ok_or_bad_request("Invalid part number")?; - let version_uuid = - uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; + let version_uuid = decode_upload_id(upload_id)?; let content_md5 = match req.headers().get("content-md5") { Some(x) => Some(x.to_str()?.to_string()), @@ -359,7 +359,7 @@ pub async fn handle_put_part( // Validate MD5 sum against content-md5 header and sha256sum against signed content-sha256 if let Some(expected_sha256) = content_sha256 { if expected_sha256 != sha256sum { - return Err(Error::Message(format!( + return Err(Error::BadRequest(format!( "Unable to validate x-amz-content-sha256" ))); } else { @@ -368,7 +368,7 @@ pub async fn handle_put_part( } if let Some(expected_md5) = content_md5 { if expected_md5.trim_matches('"') != md5sum { - return Err(Error::Message(format!("Unable to validate content-md5"))); + return Err(Error::BadRequest(format!("Unable to validate content-md5"))); } else { trace!("Successfully validated content-md5"); } @@ -384,8 +384,7 @@ pub async fn handle_complete_multipart_upload( key: &str, upload_id: &str, ) -> Result, Error> { - let version_uuid = - uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; + let version_uuid = decode_upload_id(upload_id)?; let bucket = bucket.to_string(); let key = key.to_string(); @@ -469,8 +468,7 @@ pub async fn handle_abort_multipart_upload( key: &str, upload_id: &str, ) -> Result, Error> { - let version_uuid = - uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; + let version_uuid = decode_upload_id(upload_id)?; let object = garage .object_table @@ -532,10 +530,10 @@ fn get_headers(req: &Request) -> Result { }) } -fn uuid_from_str(id: &str) -> Result { - let id_bin = hex::decode(id).map_err(|_| ())?; +fn decode_upload_id(id: &str) -> Result { + let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?; if id_bin.len() != 32 { - return Err(()); + return None.ok_or_bad_request("Invalid upload ID"); } let mut uuid = [0u8; 32]; uuid.copy_from_slice(&id_bin[..]); diff --git a/src/api/signature.rs b/src/api/signature.rs index 6e23afda..402b1881 100644 --- a/src/api/signature.rs +++ b/src/api/signature.rs @@ -7,12 +7,12 @@ use sha2::{Digest, Sha256}; use garage_table::*; use garage_util::data::Hash; -use garage_util::error::Error; use garage_model::garage::Garage; use garage_model::key_table::*; use crate::encoding::uri_encode; +use crate::error::*; const SHORT_DATE: &str = "%Y%m%d"; const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; @@ -42,9 +42,9 @@ pub async fn check_signature( let date = headers .get("x-amz-date") - .ok_or(Error::BadRequest("Missing X-Amz-Date field".into()))?; + .ok_or_bad_request("Missing X-Amz-Date field")?; let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) - .map_err(|e| Error::BadRequest(format!("Invalid date: {}", e)))? + .ok_or_bad_request("Invalid date")? .into(); let date: DateTime = DateTime::from_utc(date, Utc); @@ -90,7 +90,7 @@ pub async fn check_signature( &garage.config.s3_api.s3_region, "s3", ) - .map_err(|e| Error::Message(format!("Unable to build signing HMAC: {}", e)))?; + .ok_or_internal_error("Unable to build signing HMAC")?; hmac.input(string_to_sign.as_bytes()); let signature = hex::encode(hmac.result().code()); @@ -104,9 +104,8 @@ pub async fn check_signature( let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" { None } else { - let bytes = hex::decode(authorization.content_sha256).or(Err(Error::BadRequest( - format!("Invalid content sha256 hash"), - )))?; + let bytes = hex::decode(authorization.content_sha256) + .ok_or_bad_request("Invalid content sha256 hash")?; let mut hash = [0u8; 32]; if bytes.len() != 32 { return Err(Error::BadRequest(format!("Invalid content sha256 hash"))); @@ -132,7 +131,7 @@ fn parse_authorization( ) -> Result { let first_space = authorization .find(' ') - .ok_or(Error::BadRequest("Authorization field too short".into()))?; + .ok_or_bad_request("Authorization field to short")?; let (auth_kind, rest) = authorization.split_at(first_space); if auth_kind != "AWS4-HMAC-SHA256" { @@ -142,41 +141,32 @@ fn parse_authorization( let mut auth_params = HashMap::new(); for auth_part in rest.split(',') { let auth_part = auth_part.trim(); - let eq = auth_part.find('=').ok_or(Error::BadRequest(format!( - "Missing =value in authorization field {}", - auth_part - )))?; + let eq = auth_part + .find('=') + .ok_or_bad_request("Field without value in authorization header")?; let (key, value) = auth_part.split_at(eq); auth_params.insert(key.to_string(), value.trim_start_matches('=').to_string()); } let cred = auth_params .get("Credential") - .ok_or(Error::BadRequest(format!( - "Could not find Credential in Authorization field" - )))?; + .ok_or_bad_request("Could not find Credential in Authorization field")?; let (key_id, scope) = parse_credential(cred)?; let content_sha256 = headers .get("x-amz-content-sha256") - .ok_or(Error::BadRequest( - "Missing X-Amz-Content-Sha256 field".into(), - ))?; + .ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?; let auth = Authorization { key_id, scope, signed_headers: auth_params .get("SignedHeaders") - .ok_or(Error::BadRequest(format!( - "Could not find SignedHeaders in Authorization field" - )))? + .ok_or_bad_request("Could not find SignedHeaders in Authorization field")? .to_string(), signature: auth_params .get("Signature") - .ok_or(Error::BadRequest(format!( - "Could not find Signature in Authorization field" - )))? + .ok_or_bad_request("Could not find Signature in Authorization field")? .to_string(), content_sha256: content_sha256.to_string(), }; @@ -186,9 +176,7 @@ fn parse_authorization( fn parse_query_authorization(headers: &HashMap) -> Result { let algo = headers .get("x-amz-algorithm") - .ok_or(Error::BadRequest(format!( - "X-Amz-Algorithm not found in query parameters" - )))?; + .ok_or_bad_request("X-Amz-Algorithm not found in query parameters")?; if algo != "AWS4-HMAC-SHA256" { return Err(Error::BadRequest(format!( "Unsupported authorization method" @@ -197,20 +185,14 @@ fn parse_query_authorization(headers: &HashMap) -> Result) -> Result Result<(String, String), Error> { - let first_slash = cred.find('/').ok_or(Error::BadRequest(format!( - "Credentials does not contain / in authorization field" - )))?; + let first_slash = cred + .find('/') + .ok_or_bad_request("Credentials does not contain / in authorization field")?; let (key_id, scope) = cred.split_at(first_slash); Ok(( key_id.to_string(), diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index 1dd118ac..b29f2f77 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -55,7 +55,7 @@ impl AdminRpcHandler { AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, AdminRPC::KeyOperation(ko) => self2.handle_key_cmd(ko).await, AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, - _ => Err(Error::BadRequest(format!("Invalid RPC"))), + _ => Err(Error::BadRPC(format!("Invalid RPC"))), } } }); @@ -81,7 +81,7 @@ impl AdminRpcHandler { BucketOperation::Create(query) => { let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; if bucket.as_ref().filter(|b| !b.deleted).is_some() { - return Err(Error::BadRequest(format!( + return Err(Error::BadRPC(format!( "Bucket {} already exists", query.name ))); @@ -104,13 +104,10 @@ impl AdminRpcHandler { .get_range(&query.name, None, Some(()), 10) .await?; if !objects.is_empty() { - return Err(Error::BadRequest(format!( - "Bucket {} is not empty", - query.name - ))); + return Err(Error::BadRPC(format!("Bucket {} is not empty", query.name))); } if !query.yes { - return Err(Error::BadRequest(format!( + return Err(Error::BadRPC(format!( "Add --yes flag to really perform this operation" ))); } @@ -199,7 +196,7 @@ impl AdminRpcHandler { KeyOperation::Delete(query) => { let key = self.get_existing_key(&query.key_id).await?; if !query.yes { - return Err(Error::BadRequest(format!( + return Err(Error::BadRPC(format!( "Add --yes flag to really perform this operation" ))); } @@ -233,7 +230,7 @@ impl AdminRpcHandler { .await? .filter(|b| !b.deleted) .map(Ok) - .unwrap_or(Err(Error::BadRequest(format!( + .unwrap_or(Err(Error::BadRPC(format!( "Bucket {} does not exist", bucket )))) @@ -246,7 +243,7 @@ impl AdminRpcHandler { .await? .filter(|k| !k.deleted) .map(Ok) - .unwrap_or(Err(Error::BadRequest(format!("Key {} does not exist", id)))) + .unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id)))) } async fn update_bucket_key( @@ -306,7 +303,7 @@ impl AdminRpcHandler { async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { if !opt.yes { - return Err(Error::BadRequest(format!( + return Err(Error::BadRPC(format!( "Please provide the --yes flag to initiate repair operations." ))); } diff --git a/src/model/block.rs b/src/model/block.rs index af8b9efb..4e8bb7d9 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -123,7 +123,7 @@ impl BlockManager { Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, Message::GetBlock(h) => self.read_block(h).await, Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), - _ => Err(Error::BadRequest(format!("Unexpected RPC message"))), + _ => Err(Error::BadRPC(format!("Unexpected RPC message"))), } } diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index d19c1eb7..697cddd0 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -373,7 +373,7 @@ impl System { Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await, Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await, - _ => Err(Error::BadRequest(format!("Unexpected RPC message"))), + _ => Err(Error::BadRPC(format!("Unexpected RPC message"))), } } }); diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 4386d733..4113f15b 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -61,7 +61,10 @@ where let err_str = format!("{}", e); let rep_bytes = rmp_to_vec_all_named::>(&Err(err_str))?; let mut err_response = Response::new(Body::from(rep_bytes)); - *err_response.status_mut() = e.http_status_code(); + *err_response.status_mut() = match e { + Error::BadRPC(_) => StatusCode::BAD_REQUEST, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; warn!( "RPC error ({}): {} ({} ms)", name, diff --git a/src/table/table.rs b/src/table/table.rs index 9d43a475..2beac3f4 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -329,7 +329,7 @@ where .await?; Ok(TableRPC::SyncRPC(response)) } - _ => Err(Error::BadRequest(format!("Unexpected table RPC"))), + _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), } } diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 145b3068..6c0df15b 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -474,7 +474,7 @@ where todo.push_back(root_ck); } } else { - return Err(Error::BadRequest(format!( + return Err(Error::Message(format!( "Invalid respone to GetRootChecksumRange RPC: {}", debug_serialize(root_cks_resp) ))); @@ -530,7 +530,7 @@ where self.send_items(who, items_to_send).await?; } } else { - return Err(Error::BadRequest(format!( + return Err(Error::Message(format!( "Unexpected response to sync RPC checksums: {}", debug_serialize(&rpc_resp) ))); diff --git a/src/util/error.rs b/src/util/error.rs index 0ca1afe7..e5dcf654 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -54,9 +54,6 @@ pub enum Error { #[error(display = "TOML decode error: {}", _0)] TomlDecode(#[error(source)] toml::de::Error), - #[error(display = "Timeout: {}", _0)] - RPCTimeout(#[error(source)] tokio::time::Elapsed), - #[error(display = "Tokio join error: {}", _0)] TokioJoin(#[error(source)] tokio::task::JoinError), @@ -66,14 +63,8 @@ pub enum Error { #[error(display = "Remote error: {} (status code {})", _0, _1)] RemoteError(String, StatusCode), - #[error(display = "Bad request: {}", _0)] - BadRequest(String), - - #[error(display = "Forbidden: {}", _0)] - Forbidden(String), - - #[error(display = "Not found")] - NotFound, + #[error(display = "Bad RPC: {}", _0)] + BadRPC(String), #[error(display = "Corrupt data: does not match hash {:?}", _0)] CorruptData(Hash), @@ -82,18 +73,6 @@ pub enum Error { Message(String), } -impl Error { - pub fn http_status_code(&self) -> StatusCode { - match self { - Error::BadRequest(_) => StatusCode::BAD_REQUEST, - Error::NotFound => StatusCode::NOT_FOUND, - Error::Forbidden(_) => StatusCode::FORBIDDEN, - Error::RPC(_) => StatusCode::SERVICE_UNAVAILABLE, - _ => StatusCode::INTERNAL_SERVER_ERROR, - } - } -} - impl From> for Error { fn from(e: sled::TransactionError) -> Error { match e { @@ -114,15 +93,3 @@ impl From> for Error { Error::Message(format!("MPSC send error")) } } - -impl From for Error { - fn from(e: std::str::Utf8Error) -> Error { - Error::BadRequest(format!("Invalid UTF-8: {}", e)) - } -} - -impl From for Error { - fn from(e: roxmltree::Error) -> Error { - Error::BadRequest(format!("Invalid XML: {}", e)) - } -} From 5a5592c176c7a17f143d0f221b6495808b7f4054 Mon Sep 17 00:00:00 2001 From: Quentin Date: Wed, 11 Nov 2020 16:12:42 +0100 Subject: [PATCH 2/4] Replace with option syntaxic sugar --- src/api/s3_copy.rs | 20 ++++++-------------- src/api/s3_delete.rs | 21 +++++---------------- src/api/s3_get.rs | 35 +++++++++++------------------------ src/api/s3_list.rs | 7 +++---- src/api/s3_put.rs | 34 ++++++++++------------------------ 5 files changed, 35 insertions(+), 82 deletions(-) diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index 4280f4bf..5997e4fe 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -21,25 +21,20 @@ pub async fn handle_copy( source_bucket: &str, source_key: &str, ) -> Result, Error> { - let source_object = match garage + let source_object = garage .object_table .get(&source_bucket.to_string(), &source_key.to_string()) .await? - { - None => return Err(Error::NotFound), - Some(o) => o, - }; + .ok_or(Error::NotFound)?; - let source_last_v = match source_object + let source_last_v = source_object .versions() .iter() .rev() .filter(|v| v.is_complete()) .next() - { - Some(v) => v, - None => return Err(Error::NotFound), - }; + .ok_or(Error::NotFound)?; + let source_last_state = match &source_last_v.state { ObjectVersionState::Complete(x) => x, _ => unreachable!(), @@ -69,10 +64,7 @@ pub async fn handle_copy( .version_table .get(&source_last_v.uuid, &EmptyKey) .await?; - let source_version = match source_version { - Some(v) => v, - None => return Err(Error::NotFound), - }; + let source_version = source_version.ok_or(Error::NotFound)?; let dest_version = Version::new( new_uuid, diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 33e47c17..f46bfcfe 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -16,17 +16,11 @@ async fn handle_delete_internal( bucket: &str, key: &str, ) -> Result<(UUID, UUID), Error> { - let object = match garage + let object = garage .object_table .get(&bucket.to_string(), &key.to_string()) .await? - { - None => { - // No need to delete - return Err(Error::NotFound); - } - Some(o) => o, - }; + .ok_or(Error::NotFound)?; // No need to delete let interesting_versions = object.versions().iter().filter(|v| match v.state { ObjectVersionState::Aborted => false, @@ -43,10 +37,7 @@ async fn handle_delete_internal( timestamp = std::cmp::max(timestamp, v.timestamp + 1); } - let deleted_version = match must_delete { - None => return Err(Error::NotFound), - Some(v) => v, - }; + let deleted_version = must_delete.ok_or(Error::NotFound)?; let version_uuid = gen_uuid(); @@ -142,10 +133,8 @@ fn parse_delete_objects_xml(xml: &roxmltree::Document) -> Result del, - None => return Err(format!("Delete tag not found")), - }; + let delete = root.first_child().ok_or(format!("Delete tag not found"))?; + if !delete.has_tag_name("Delete") { return Err(format!("Invalid root tag: {:?}", root)); } diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 71c656f2..a68c485b 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -41,25 +41,20 @@ pub async fn handle_head( bucket: &str, key: &str, ) -> Result, Error> { - let object = match garage + let object = garage .object_table .get(&bucket.to_string(), &key.to_string()) .await? - { - None => return Err(Error::NotFound), - Some(o) => o, - }; + .ok_or(Error::NotFound)?; - let version = match object + let version = object .versions() .iter() .rev() .filter(|v| v.is_data()) .next() - { - Some(v) => v, - None => return Err(Error::NotFound), - }; + .ok_or(Error::NotFound)?; + let version_meta = match &version.state { ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta, @@ -80,25 +75,20 @@ pub async fn handle_get( bucket: &str, key: &str, ) -> Result, Error> { - let object = match garage + let object = garage .object_table .get(&bucket.to_string(), &key.to_string()) .await? - { - None => return Err(Error::NotFound), - Some(o) => o, - }; + .ok_or(Error::NotFound)?; - let last_v = match object + let last_v = object .versions() .iter() .rev() .filter(|v| v.is_complete()) .next() - { - Some(v) => v, - None => return Err(Error::NotFound), - }; + .ok_or(Error::NotFound)?; + let last_v_data = match &last_v.state { ObjectVersionState::Complete(x) => x, _ => unreachable!(), @@ -146,10 +136,7 @@ pub async fn handle_get( let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; - let version = match version { - Some(v) => v, - None => return Err(Error::NotFound), - }; + let version = version.ok_or(Error::NotFound)?; let mut blocks = version .blocks() diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 0a3b62ec..9e6bc0f5 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -64,10 +64,9 @@ pub async fn handle_list( } let common_prefix = if delimiter.len() > 0 { let relative_key = &object.key[prefix.len()..]; - match relative_key.find(delimiter) { - Some(i) => Some(&object.key[..prefix.len() + i + delimiter.len()]), - None => None, - } + relative_key + .find(delimiter) + .and_then(move |i| Some(&object.key[..prefix.len() + i + delimiter.len()])) } else { None }; diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index ea09524c..72613323 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -37,10 +37,7 @@ pub async fn handle_put( let body = req.into_body(); let mut chunker = BodyChunker::new(body, garage.config.block_size); - let first_block = match chunker.next().await? { - Some(x) => x, - None => vec![], - }; + let first_block = chunker.next().await?.unwrap_or(vec![]); let mut object_version = ObjectVersion { uuid: version_uuid, @@ -325,14 +322,9 @@ pub async fn handle_put_part( let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?; // Check object is valid and multipart block can be accepted - let first_block = match first_block { - None => return Err(Error::BadRequest(format!("Empty body"))), - Some(x) => x, - }; - let object = match object { - None => return Err(Error::BadRequest(format!("Object not found"))), - Some(x) => x, - }; + let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?; + let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; + if !object .versions() .iter() @@ -392,10 +384,8 @@ pub async fn handle_complete_multipart_upload( garage.object_table.get(&bucket, &key), garage.version_table.get(&version_uuid, &EmptyKey), )?; - let object = match object { - None => return Err(Error::BadRequest(format!("Object not found"))), - Some(x) => x, - }; + let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; + let object_version = object .versions() .iter() @@ -408,10 +398,8 @@ pub async fn handle_complete_multipart_upload( } Some(x) => x.clone(), }; - let version = match version { - None => return Err(Error::BadRequest(format!("Version not found"))), - Some(x) => x, - }; + let version = version.ok_or(Error::BadRequest(format!("Version not found")))?; + if version.blocks().len() == 0 { return Err(Error::BadRequest(format!("No data was uploaded"))); } @@ -474,10 +462,8 @@ pub async fn handle_abort_multipart_upload( .object_table .get(&bucket.to_string(), &key.to_string()) .await?; - let object = match object { - None => return Err(Error::BadRequest(format!("Object not found"))), - Some(x) => x, - }; + let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; + let object_version = object .versions() .iter() From 7d7b9e95a9b7fe71e7bb42b08a22698475a9c78c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Nov 2020 16:36:48 +0100 Subject: [PATCH 3/4] Simplify and_then(Some) as map() and remove move --- src/api/s3_list.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 9e6bc0f5..f2b49a1d 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -66,7 +66,7 @@ pub async fn handle_list( let relative_key = &object.key[prefix.len()..]; relative_key .find(delimiter) - .and_then(move |i| Some(&object.key[..prefix.len() + i + delimiter.len()])) + .map(|i| &object.key[..prefix.len() + i + delimiter.len()]) } else { None }; From 163c3b6c169bebafccd284cbb0b03a83d3c46fd7 Mon Sep 17 00:00:00 2001 From: Quentin Date: Sun, 15 Nov 2020 14:43:23 +0100 Subject: [PATCH 4/4] Propose a script to spawn a cluster quickly --- example/dev-cluster.sh | 54 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100755 example/dev-cluster.sh diff --git a/example/dev-cluster.sh b/example/dev-cluster.sh new file mode 100755 index 00000000..e896b6ba --- /dev/null +++ b/example/dev-cluster.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +set -e + +SCRIPT_FOLDER="`dirname \"$0\"`" +REPO_FOLDER="${SCRIPT_FOLDER}/../" +GARAGE_DEBUG="${REPO_FOLDER}/target/debug/" +GARAGE_RELEASE="${REPO_FOLDER}/target/release/" +PATH="${GARAGE_DEBUG}:${GARAGE_RELEASE}:$PATH" +FANCYCOLORS=("41m" "42m" "44m" "45m" "100m" "104m") + +export RUST_BACKTRACE=1 +export RUST_LOG=garage=info +MAIN_LABEL="\e[${FANCYCOLORS[0]}[main]\e[49m" + +for count in $(seq 1 3); do +CONF_PATH="/tmp/config.$count.toml" +LABEL="\e[${FANCYCOLORS[$count]}[$count]\e[49m" + +cat > $CONF_PATH <&1|while read r; do echo -en "$LABEL $r\n"; done) & +done + +until garage status 2>&1|grep -q Healthy ; do + echo -en "${MAIN_LABEL} cluster starting...\n" + sleep 1 +done +echo -en "${MAIN_LABEL} cluster started\n" + +wait