1
0
Fork 0
forked from Deuxfleurs/garage

Merge branch 'master' into feature/website

This commit is contained in:
Quentin 2020-11-19 14:39:30 +01:00
commit fc427b0b66
18 changed files with 282 additions and 213 deletions

54
example/dev-cluster.sh Executable file
View file

@ -0,0 +1,54 @@
#!/bin/bash
set -e
SCRIPT_FOLDER="`dirname \"$0\"`"
REPO_FOLDER="${SCRIPT_FOLDER}/../"
GARAGE_DEBUG="${REPO_FOLDER}/target/debug/"
GARAGE_RELEASE="${REPO_FOLDER}/target/release/"
PATH="${GARAGE_DEBUG}:${GARAGE_RELEASE}:$PATH"
FANCYCOLORS=("41m" "42m" "44m" "45m" "100m" "104m")
export RUST_BACKTRACE=1
export RUST_LOG=garage=info
MAIN_LABEL="\e[${FANCYCOLORS[0]}[main]\e[49m"
for count in $(seq 1 3); do
CONF_PATH="/tmp/config.$count.toml"
LABEL="\e[${FANCYCOLORS[$count]}[$count]\e[49m"
cat > $CONF_PATH <<EOF
block_size = 1048576 # objects are split in blocks of maximum this number of bytes
metadata_dir = "/tmp/garage-meta-$count"
data_dir = "/tmp/garage-data-$count"
rpc_bind_addr = "127.0.0.$count:3901" # the port other Garage nodes will use to talk to this node
bootstrap_peers = [
"127.0.0.1:3901",
"127.0.0.2:3901",
"127.0.0.3:3901"
]
max_concurrent_rpc_requests = 12
data_replication_factor = 3
meta_replication_factor = 3
meta_epidemic_fanout = 3
[s3_api]
api_bind_addr = "127.0.0.$count:3900" # the S3 API port, HTTP without TLS. Add a reverse proxy for the TLS part.
s3_region = "garage" # set this to anything. S3 API calls will fail if they are not made against the region set here.
[s3_web]
web_bind_addr = "127.0.0.$count:3902"
EOF
echo -en "$LABEL configuration written to $CONF_PATH\n"
(garage server -c /tmp/config.$count.toml 2>&1|while read r; do echo -en "$LABEL $r\n"; done) &
done
until garage status 2>&1|grep -q Healthy ; do
echo -en "${MAIN_LABEL} cluster starting...\n"
sleep 1
done
echo -en "${MAIN_LABEL} cluster started\n"
wait

View file

@ -17,6 +17,7 @@ garage_util = { version = "0.1", path = "../util" }
garage_table = { version = "0.1.1", path = "../table" } garage_table = { version = "0.1.1", path = "../table" }
garage_model = { version = "0.1.1", path = "../model" } garage_model = { version = "0.1.1", path = "../model" }
err-derive = "0.2.3"
bytes = "0.4" bytes = "0.4"
hex = "0.3" hex = "0.3"
log = "0.4" log = "0.4"

View file

@ -7,10 +7,11 @@ use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server}; use hyper::{Body, Method, Request, Response, Server};
use garage_util::error::Error; use garage_util::error::Error as GarageError;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use crate::error::*;
use crate::signature::check_signature; use crate::signature::check_signature;
use crate::s3_copy::*; use crate::s3_copy::*;
@ -22,14 +23,14 @@ use crate::s3_put::*;
pub async fn run_api_server( pub async fn run_api_server(
garage: Arc<Garage>, garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,
) -> Result<(), Error> { ) -> Result<(), GarageError> {
let addr = &garage.config.s3_api.api_bind_addr; let addr = &garage.config.s3_api.api_bind_addr;
let service = make_service_fn(|conn: &AddrStream| { let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone(); let garage = garage.clone();
let client_addr = conn.remote_addr(); let client_addr = conn.remote_addr();
async move { async move {
Ok::<_, Error>(service_fn(move |req: Request<Body>| { Ok::<_, GarageError>(service_fn(move |req: Request<Body>| {
let garage = garage.clone(); let garage = garage.clone();
handler(garage, req, client_addr) handler(garage, req, client_addr)
})) }))
@ -49,7 +50,7 @@ async fn handler(
garage: Arc<Garage>, garage: Arc<Garage>,
req: Request<Body>, req: Request<Body>,
addr: SocketAddr, addr: SocketAddr,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, GarageError> {
info!("{} {} {}", addr, req.method(), req.uri()); info!("{} {} {}", addr, req.method(), req.uri());
debug!("{:?}", req); debug!("{:?}", req);
match handler_inner(garage, req).await { match handler_inner(garage, req).await {
@ -131,10 +132,7 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
source_bucket source_bucket
))); )));
} }
let source_key = match source_key { let source_key = source_key.ok_or_bad_request("No source key specified")?;
None => return Err(Error::BadRequest(format!("No source key specified"))),
Some(x) => x,
};
Ok(handle_copy(garage, &bucket, &key, &source_bucket, &source_key).await?) Ok(handle_copy(garage, &bucket, &key, &source_bucket, &source_key).await?)
} else { } else {
// PutObject query // PutObject query
@ -205,9 +203,8 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
let max_keys = params let max_keys = params
.get("max-keys") .get("max-keys")
.map(|x| { .map(|x| {
x.parse::<usize>().map_err(|e| { x.parse::<usize>()
Error::BadRequest(format!("Invalid value for max-keys: {}", e)) .ok_or_bad_request("Invalid value for max-keys")
})
}) })
.unwrap_or(Ok(1000))?; .unwrap_or(Ok(1000))?;
let prefix = params.get("prefix").map(|x| x.as_str()).unwrap_or(&""); let prefix = params.get("prefix").map(|x| x.as_str()).unwrap_or(&"");

116
src/api/error.rs Normal file
View file

@ -0,0 +1,116 @@
use err_derive::Error;
use hyper::StatusCode;
use garage_util::error::Error as GarageError;
#[derive(Debug, Error)]
pub enum Error {
// Category: internal error
#[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError),
#[error(display = "Internal error (Hyper error): {}", _0)]
Hyper(#[error(source)] hyper::Error),
#[error(display = "Internal error (HTTP error): {}", _0)]
HTTP(#[error(source)] http::Error),
// Category: cannot process
#[error(display = "Forbidden: {}", _0)]
Forbidden(String),
#[error(display = "Not found")]
NotFound,
// Category: bad request
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8(#[error(source)] std::str::Utf8Error),
#[error(display = "Invalid XML: {}", _0)]
InvalidXML(#[error(source)] roxmltree::Error),
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
#[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] http_range::HttpRangeParseError),
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
}
impl Error {
pub fn http_status_code(&self) -> StatusCode {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
Error::Forbidden(_) => StatusCode::FORBIDDEN,
Error::InternalError(GarageError::RPC(_)) => StatusCode::SERVICE_UNAVAILABLE,
Error::InternalError(_) | Error::Hyper(_) | Error::HTTP(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
_ => StatusCode::BAD_REQUEST,
}
}
}
pub trait OkOrBadRequest {
type S2;
fn ok_or_bad_request(self, reason: &'static str) -> Self::S2;
}
impl<T, E> OkOrBadRequest for Result<T, E>
where
E: std::fmt::Display,
{
type S2 = Result<T, Error>;
fn ok_or_bad_request(self, reason: &'static str) -> Result<T, Error> {
match self {
Ok(x) => Ok(x),
Err(e) => Err(Error::BadRequest(format!("{}: {}", reason, e))),
}
}
}
impl<T> OkOrBadRequest for Option<T> {
type S2 = Result<T, Error>;
fn ok_or_bad_request(self, reason: &'static str) -> Result<T, Error> {
match self {
Some(x) => Ok(x),
None => Err(Error::BadRequest(format!("{}", reason))),
}
}
}
pub trait OkOrInternalError {
type S2;
fn ok_or_internal_error(self, reason: &'static str) -> Self::S2;
}
impl<T, E> OkOrInternalError for Result<T, E>
where
E: std::fmt::Display,
{
type S2 = Result<T, Error>;
fn ok_or_internal_error(self, reason: &'static str) -> Result<T, Error> {
match self {
Ok(x) => Ok(x),
Err(e) => Err(Error::InternalError(GarageError::Message(format!(
"{}: {}",
reason, e
)))),
}
}
}
impl<T> OkOrInternalError for Option<T> {
type S2 = Result<T, Error>;
fn ok_or_internal_error(self, reason: &'static str) -> Result<T, Error> {
match self {
Some(x) => Ok(x),
None => Err(Error::InternalError(GarageError::Message(format!(
"{}",
reason
)))),
}
}
}

View file

@ -1,6 +1,8 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod error;
pub mod encoding; pub mod encoding;
pub mod api_server; pub mod api_server;

View file

@ -6,13 +6,14 @@ use hyper::{Body, Response};
use garage_table::*; use garage_table::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error;
use garage_model::block_ref_table::*; use garage_model::block_ref_table::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::object_table::*; use garage_model::object_table::*;
use garage_model::version_table::*; use garage_model::version_table::*;
use crate::error::*;
pub async fn handle_copy( pub async fn handle_copy(
garage: Arc<Garage>, garage: Arc<Garage>,
dest_bucket: &str, dest_bucket: &str,
@ -20,25 +21,20 @@ pub async fn handle_copy(
source_bucket: &str, source_bucket: &str,
source_key: &str, source_key: &str,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let source_object = match garage let source_object = garage
.object_table .object_table
.get(&source_bucket.to_string(), &source_key.to_string()) .get(&source_bucket.to_string(), &source_key.to_string())
.await? .await?
{ .ok_or(Error::NotFound)?;
None => return Err(Error::NotFound),
Some(o) => o,
};
let source_last_v = match source_object let source_last_v = source_object
.versions() .versions()
.iter() .iter()
.rev() .rev()
.filter(|v| v.is_complete()) .filter(|v| v.is_complete())
.next() .next()
{ .ok_or(Error::NotFound)?;
Some(v) => v,
None => return Err(Error::NotFound),
};
let source_last_state = match &source_last_v.state { let source_last_state = match &source_last_v.state {
ObjectVersionState::Complete(x) => x, ObjectVersionState::Complete(x) => x,
_ => unreachable!(), _ => unreachable!(),
@ -68,10 +64,7 @@ pub async fn handle_copy(
.version_table .version_table
.get(&source_last_v.uuid, &EmptyKey) .get(&source_last_v.uuid, &EmptyKey)
.await?; .await?;
let source_version = match source_version { let source_version = source_version.ok_or(Error::NotFound)?;
Some(v) => v,
None => return Err(Error::NotFound),
};
let dest_version = Version::new( let dest_version = Version::new(
new_uuid, new_uuid,

View file

@ -4,29 +4,23 @@ use std::sync::Arc;
use hyper::{Body, Request, Response}; use hyper::{Body, Request, Response};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::object_table::*; use garage_model::object_table::*;
use crate::encoding::*; use crate::encoding::*;
use crate::error::*;
async fn handle_delete_internal( async fn handle_delete_internal(
garage: &Garage, garage: &Garage,
bucket: &str, bucket: &str,
key: &str, key: &str,
) -> Result<(UUID, UUID), Error> { ) -> Result<(UUID, UUID), Error> {
let object = match garage let object = garage
.object_table .object_table
.get(&bucket.to_string(), &key.to_string()) .get(&bucket.to_string(), &key.to_string())
.await? .await?
{ .ok_or(Error::NotFound)?; // No need to delete
None => {
// No need to delete
return Err(Error::NotFound);
}
Some(o) => o,
};
let interesting_versions = object.versions().iter().filter(|v| match v.state { let interesting_versions = object.versions().iter().filter(|v| match v.state {
ObjectVersionState::Aborted => false, ObjectVersionState::Aborted => false,
@ -43,10 +37,7 @@ async fn handle_delete_internal(
timestamp = std::cmp::max(timestamp, v.timestamp + 1); timestamp = std::cmp::max(timestamp, v.timestamp + 1);
} }
let deleted_version = match must_delete { let deleted_version = must_delete.ok_or(Error::NotFound)?;
None => return Err(Error::NotFound),
Some(v) => v,
};
let version_uuid = gen_uuid(); let version_uuid = gen_uuid();
@ -85,8 +76,7 @@ pub async fn handle_delete_objects(
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?; let body = hyper::body::to_bytes(req.into_body()).await?;
let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?; let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?;
let cmd = parse_delete_objects_xml(&cmd_xml) let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
.map_err(|e| Error::BadRequest(format!("Invald delete XML query: {}", e)))?;
let mut retxml = String::new(); let mut retxml = String::new();
writeln!(&mut retxml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap(); writeln!(&mut retxml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
@ -143,10 +133,8 @@ fn parse_delete_objects_xml(xml: &roxmltree::Document) -> Result<DeleteRequest,
let mut ret = DeleteRequest { objects: vec![] }; let mut ret = DeleteRequest { objects: vec![] };
let root = xml.root(); let root = xml.root();
let delete = match root.first_child() { let delete = root.first_child().ok_or(format!("Delete tag not found"))?;
Some(del) => del,
None => return Err(format!("Delete tag not found")),
};
if !delete.has_tag_name("Delete") { if !delete.has_tag_name("Delete") {
return Err(format!("Invalid root tag: {:?}", root)); return Err(format!("Invalid root tag: {:?}", root));
} }

View file

@ -5,13 +5,13 @@ use futures::stream::*;
use hyper::body::Bytes; use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
use garage_util::error::Error;
use garage_table::EmptyKey; use garage_table::EmptyKey;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::object_table::*; use garage_model::object_table::*;
use crate::error::*;
fn object_headers( fn object_headers(
version: &ObjectVersion, version: &ObjectVersion,
version_meta: &ObjectVersionMeta, version_meta: &ObjectVersionMeta,
@ -41,25 +41,20 @@ pub async fn handle_head(
bucket: &str, bucket: &str,
key: &str, key: &str,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let object = match garage let object = garage
.object_table .object_table
.get(&bucket.to_string(), &key.to_string()) .get(&bucket.to_string(), &key.to_string())
.await? .await?
{ .ok_or(Error::NotFound)?;
None => return Err(Error::NotFound),
Some(o) => o,
};
let version = match object let version = object
.versions() .versions()
.iter() .iter()
.rev() .rev()
.filter(|v| v.is_data()) .filter(|v| v.is_data())
.next() .next()
{ .ok_or(Error::NotFound)?;
Some(v) => v,
None => return Err(Error::NotFound),
};
let version_meta = match &version.state { let version_meta = match &version.state {
ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta, ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
@ -80,25 +75,20 @@ pub async fn handle_get(
bucket: &str, bucket: &str,
key: &str, key: &str,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let object = match garage let object = garage
.object_table .object_table
.get(&bucket.to_string(), &key.to_string()) .get(&bucket.to_string(), &key.to_string())
.await? .await?
{ .ok_or(Error::NotFound)?;
None => return Err(Error::NotFound),
Some(o) => o,
};
let last_v = match object let last_v = object
.versions() .versions()
.iter() .iter()
.rev() .rev()
.filter(|v| v.is_complete()) .filter(|v| v.is_complete())
.next() .next()
{ .ok_or(Error::NotFound)?;
Some(v) => v,
None => return Err(Error::NotFound),
};
let last_v_data = match &last_v.state { let last_v_data = match &last_v.state {
ObjectVersionState::Complete(x) => x, ObjectVersionState::Complete(x) => x,
_ => unreachable!(), _ => unreachable!(),
@ -111,11 +101,8 @@ pub async fn handle_get(
let range = match req.headers().get("range") { let range = match req.headers().get("range") {
Some(range) => { Some(range) => {
let range_str = range let range_str = range.to_str()?;
.to_str() let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size)?;
.map_err(|e| Error::BadRequest(format!("Invalid range header: {}", e)))?;
let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size)
.map_err(|_e| Error::BadRequest(format!("Invalid range")))?;
if ranges.len() > 1 { if ranges.len() > 1 {
return Err(Error::BadRequest(format!("Multiple ranges not supported"))); return Err(Error::BadRequest(format!("Multiple ranges not supported")));
} else { } else {
@ -149,10 +136,7 @@ pub async fn handle_get(
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
let version = match version { let version = version.ok_or(Error::NotFound)?;
Some(v) => v,
None => return Err(Error::NotFound),
};
let mut blocks = version let mut blocks = version
.blocks() .blocks()
@ -210,7 +194,9 @@ pub async fn handle_get_range(
let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec()); let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec());
Ok(resp_builder.body(body)?) Ok(resp_builder.body(body)?)
} else { } else {
Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been"))) None.ok_or_internal_error(
"Requested range not present in inline bytes when it should have been",
)
} }
} }
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {

View file

@ -64,10 +64,9 @@ pub async fn handle_list(
} }
let common_prefix = if delimiter.len() > 0 { let common_prefix = if delimiter.len() > 0 {
let relative_key = &object.key[prefix.len()..]; let relative_key = &object.key[prefix.len()..];
match relative_key.find(delimiter) { relative_key
Some(i) => Some(&object.key[..prefix.len() + i + delimiter.len()]), .find(delimiter)
None => None, .map(|i| &object.key[..prefix.len() + i + delimiter.len()])
}
} else { } else {
None None
}; };

View file

@ -9,8 +9,9 @@ use sha2::{Digest as Sha256Digest, Sha256};
use garage_table::*; use garage_table::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error as GarageError;
use crate::error::*;
use garage_model::block::INLINE_THRESHOLD; use garage_model::block::INLINE_THRESHOLD;
use garage_model::block_ref_table::*; use garage_model::block_ref_table::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
@ -36,10 +37,7 @@ pub async fn handle_put(
let body = req.into_body(); let body = req.into_body();
let mut chunker = BodyChunker::new(body, garage.config.block_size); let mut chunker = BodyChunker::new(body, garage.config.block_size);
let first_block = match chunker.next().await? { let first_block = chunker.next().await?.unwrap_or(vec![]);
Some(x) => x,
None => vec![],
};
let mut object_version = ObjectVersion { let mut object_version = ObjectVersion {
uuid: version_uuid, uuid: version_uuid,
@ -85,7 +83,7 @@ pub async fn handle_put(
// Validate MD5 sum against content-md5 header and sha256sum against signed content-sha256 // Validate MD5 sum against content-md5 header and sha256sum against signed content-sha256
if let Some(expected_sha256) = content_sha256 { if let Some(expected_sha256) = content_sha256 {
if expected_sha256 != sha256sum { if expected_sha256 != sha256sum {
return Err(Error::Message(format!( return Err(Error::BadRequest(format!(
"Unable to validate x-amz-content-sha256" "Unable to validate x-amz-content-sha256"
))); )));
} else { } else {
@ -94,7 +92,7 @@ pub async fn handle_put(
} }
if let Some(expected_md5) = content_md5 { if let Some(expected_md5) = content_md5 {
if expected_md5.trim_matches('"') != md5sum { if expected_md5.trim_matches('"') != md5sum {
return Err(Error::Message(format!("Unable to validate content-md5"))); return Err(Error::BadRequest(format!("Unable to validate content-md5")));
} else { } else {
trace!("Successfully validated content-md5"); trace!("Successfully validated content-md5");
} }
@ -184,7 +182,7 @@ async fn put_block_meta(
offset: u64, offset: u64,
hash: Hash, hash: Hash,
size: u64, size: u64,
) -> Result<(), Error> { ) -> Result<(), GarageError> {
// TODO: don't clone, restart from empty block list ?? // TODO: don't clone, restart from empty block list ??
let mut version = version.clone(); let mut version = version.clone();
version version
@ -225,7 +223,7 @@ impl BodyChunker {
buf: VecDeque::new(), buf: VecDeque::new(),
} }
} }
async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
while !self.read_all && self.buf.len() < self.block_size { while !self.read_all && self.buf.len() < self.block_size {
if let Some(block) = self.body.next().await { if let Some(block) = self.body.next().await {
let bytes = block?; let bytes = block?;
@ -305,10 +303,9 @@ pub async fn handle_put_part(
// Check parameters // Check parameters
let part_number = part_number_str let part_number = part_number_str
.parse::<u64>() .parse::<u64>()
.map_err(|e| Error::BadRequest(format!("Invalid part number: {}", e)))?; .ok_or_bad_request("Invalid part number")?;
let version_uuid = let version_uuid = decode_upload_id(upload_id)?;
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
let content_md5 = match req.headers().get("content-md5") { let content_md5 = match req.headers().get("content-md5") {
Some(x) => Some(x.to_str()?.to_string()), Some(x) => Some(x.to_str()?.to_string()),
@ -325,14 +322,9 @@ pub async fn handle_put_part(
let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?; let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?;
// Check object is valid and multipart block can be accepted // Check object is valid and multipart block can be accepted
let first_block = match first_block { let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?;
None => return Err(Error::BadRequest(format!("Empty body"))), let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
Some(x) => x,
};
let object = match object {
None => return Err(Error::BadRequest(format!("Object not found"))),
Some(x) => x,
};
if !object if !object
.versions() .versions()
.iter() .iter()
@ -359,7 +351,7 @@ pub async fn handle_put_part(
// Validate MD5 sum against content-md5 header and sha256sum against signed content-sha256 // Validate MD5 sum against content-md5 header and sha256sum against signed content-sha256
if let Some(expected_sha256) = content_sha256 { if let Some(expected_sha256) = content_sha256 {
if expected_sha256 != sha256sum { if expected_sha256 != sha256sum {
return Err(Error::Message(format!( return Err(Error::BadRequest(format!(
"Unable to validate x-amz-content-sha256" "Unable to validate x-amz-content-sha256"
))); )));
} else { } else {
@ -368,7 +360,7 @@ pub async fn handle_put_part(
} }
if let Some(expected_md5) = content_md5 { if let Some(expected_md5) = content_md5 {
if expected_md5.trim_matches('"') != md5sum { if expected_md5.trim_matches('"') != md5sum {
return Err(Error::Message(format!("Unable to validate content-md5"))); return Err(Error::BadRequest(format!("Unable to validate content-md5")));
} else { } else {
trace!("Successfully validated content-md5"); trace!("Successfully validated content-md5");
} }
@ -384,8 +376,7 @@ pub async fn handle_complete_multipart_upload(
key: &str, key: &str,
upload_id: &str, upload_id: &str,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let version_uuid = let version_uuid = decode_upload_id(upload_id)?;
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
let bucket = bucket.to_string(); let bucket = bucket.to_string();
let key = key.to_string(); let key = key.to_string();
@ -393,10 +384,8 @@ pub async fn handle_complete_multipart_upload(
garage.object_table.get(&bucket, &key), garage.object_table.get(&bucket, &key),
garage.version_table.get(&version_uuid, &EmptyKey), garage.version_table.get(&version_uuid, &EmptyKey),
)?; )?;
let object = match object { let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
None => return Err(Error::BadRequest(format!("Object not found"))),
Some(x) => x,
};
let object_version = object let object_version = object
.versions() .versions()
.iter() .iter()
@ -409,10 +398,8 @@ pub async fn handle_complete_multipart_upload(
} }
Some(x) => x.clone(), Some(x) => x.clone(),
}; };
let version = match version { let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
None => return Err(Error::BadRequest(format!("Version not found"))),
Some(x) => x,
};
if version.blocks().len() == 0 { if version.blocks().len() == 0 {
return Err(Error::BadRequest(format!("No data was uploaded"))); return Err(Error::BadRequest(format!("No data was uploaded")));
} }
@ -469,17 +456,14 @@ pub async fn handle_abort_multipart_upload(
key: &str, key: &str,
upload_id: &str, upload_id: &str,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let version_uuid = let version_uuid = decode_upload_id(upload_id)?;
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
let object = garage let object = garage
.object_table .object_table
.get(&bucket.to_string(), &key.to_string()) .get(&bucket.to_string(), &key.to_string())
.await?; .await?;
let object = match object { let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
None => return Err(Error::BadRequest(format!("Object not found"))),
Some(x) => x,
};
let object_version = object let object_version = object
.versions() .versions()
.iter() .iter()
@ -532,10 +516,10 @@ fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
}) })
} }
fn uuid_from_str(id: &str) -> Result<UUID, ()> { fn decode_upload_id(id: &str) -> Result<UUID, Error> {
let id_bin = hex::decode(id).map_err(|_| ())?; let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?;
if id_bin.len() != 32 { if id_bin.len() != 32 {
return Err(()); return None.ok_or_bad_request("Invalid upload ID");
} }
let mut uuid = [0u8; 32]; let mut uuid = [0u8; 32];
uuid.copy_from_slice(&id_bin[..]); uuid.copy_from_slice(&id_bin[..]);

View file

@ -7,12 +7,12 @@ use sha2::{Digest, Sha256};
use garage_table::*; use garage_table::*;
use garage_util::data::Hash; use garage_util::data::Hash;
use garage_util::error::Error;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::key_table::*; use garage_model::key_table::*;
use crate::encoding::uri_encode; use crate::encoding::uri_encode;
use crate::error::*;
const SHORT_DATE: &str = "%Y%m%d"; const SHORT_DATE: &str = "%Y%m%d";
const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
@ -42,9 +42,9 @@ pub async fn check_signature(
let date = headers let date = headers
.get("x-amz-date") .get("x-amz-date")
.ok_or(Error::BadRequest("Missing X-Amz-Date field".into()))?; .ok_or_bad_request("Missing X-Amz-Date field")?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
.map_err(|e| Error::BadRequest(format!("Invalid date: {}", e)))? .ok_or_bad_request("Invalid date")?
.into(); .into();
let date: DateTime<Utc> = DateTime::from_utc(date, Utc); let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
@ -90,7 +90,7 @@ pub async fn check_signature(
&garage.config.s3_api.s3_region, &garage.config.s3_api.s3_region,
"s3", "s3",
) )
.map_err(|e| Error::Message(format!("Unable to build signing HMAC: {}", e)))?; .ok_or_internal_error("Unable to build signing HMAC")?;
hmac.input(string_to_sign.as_bytes()); hmac.input(string_to_sign.as_bytes());
let signature = hex::encode(hmac.result().code()); let signature = hex::encode(hmac.result().code());
@ -104,9 +104,8 @@ pub async fn check_signature(
let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" { let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
None None
} else { } else {
let bytes = hex::decode(authorization.content_sha256).or(Err(Error::BadRequest( let bytes = hex::decode(authorization.content_sha256)
format!("Invalid content sha256 hash"), .ok_or_bad_request("Invalid content sha256 hash")?;
)))?;
let mut hash = [0u8; 32]; let mut hash = [0u8; 32];
if bytes.len() != 32 { if bytes.len() != 32 {
return Err(Error::BadRequest(format!("Invalid content sha256 hash"))); return Err(Error::BadRequest(format!("Invalid content sha256 hash")));
@ -132,7 +131,7 @@ fn parse_authorization(
) -> Result<Authorization, Error> { ) -> Result<Authorization, Error> {
let first_space = authorization let first_space = authorization
.find(' ') .find(' ')
.ok_or(Error::BadRequest("Authorization field too short".into()))?; .ok_or_bad_request("Authorization field to short")?;
let (auth_kind, rest) = authorization.split_at(first_space); let (auth_kind, rest) = authorization.split_at(first_space);
if auth_kind != "AWS4-HMAC-SHA256" { if auth_kind != "AWS4-HMAC-SHA256" {
@ -142,41 +141,32 @@ fn parse_authorization(
let mut auth_params = HashMap::new(); let mut auth_params = HashMap::new();
for auth_part in rest.split(',') { for auth_part in rest.split(',') {
let auth_part = auth_part.trim(); let auth_part = auth_part.trim();
let eq = auth_part.find('=').ok_or(Error::BadRequest(format!( let eq = auth_part
"Missing =value in authorization field {}", .find('=')
auth_part .ok_or_bad_request("Field without value in authorization header")?;
)))?;
let (key, value) = auth_part.split_at(eq); let (key, value) = auth_part.split_at(eq);
auth_params.insert(key.to_string(), value.trim_start_matches('=').to_string()); auth_params.insert(key.to_string(), value.trim_start_matches('=').to_string());
} }
let cred = auth_params let cred = auth_params
.get("Credential") .get("Credential")
.ok_or(Error::BadRequest(format!( .ok_or_bad_request("Could not find Credential in Authorization field")?;
"Could not find Credential in Authorization field"
)))?;
let (key_id, scope) = parse_credential(cred)?; let (key_id, scope) = parse_credential(cred)?;
let content_sha256 = headers let content_sha256 = headers
.get("x-amz-content-sha256") .get("x-amz-content-sha256")
.ok_or(Error::BadRequest( .ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?;
"Missing X-Amz-Content-Sha256 field".into(),
))?;
let auth = Authorization { let auth = Authorization {
key_id, key_id,
scope, scope,
signed_headers: auth_params signed_headers: auth_params
.get("SignedHeaders") .get("SignedHeaders")
.ok_or(Error::BadRequest(format!( .ok_or_bad_request("Could not find SignedHeaders in Authorization field")?
"Could not find SignedHeaders in Authorization field"
)))?
.to_string(), .to_string(),
signature: auth_params signature: auth_params
.get("Signature") .get("Signature")
.ok_or(Error::BadRequest(format!( .ok_or_bad_request("Could not find Signature in Authorization field")?
"Could not find Signature in Authorization field"
)))?
.to_string(), .to_string(),
content_sha256: content_sha256.to_string(), content_sha256: content_sha256.to_string(),
}; };
@ -186,9 +176,7 @@ fn parse_authorization(
fn parse_query_authorization(headers: &HashMap<String, String>) -> Result<Authorization, Error> { fn parse_query_authorization(headers: &HashMap<String, String>) -> Result<Authorization, Error> {
let algo = headers let algo = headers
.get("x-amz-algorithm") .get("x-amz-algorithm")
.ok_or(Error::BadRequest(format!( .ok_or_bad_request("X-Amz-Algorithm not found in query parameters")?;
"X-Amz-Algorithm not found in query parameters"
)))?;
if algo != "AWS4-HMAC-SHA256" { if algo != "AWS4-HMAC-SHA256" {
return Err(Error::BadRequest(format!( return Err(Error::BadRequest(format!(
"Unsupported authorization method" "Unsupported authorization method"
@ -197,20 +185,14 @@ fn parse_query_authorization(headers: &HashMap<String, String>) -> Result<Author
let cred = headers let cred = headers
.get("x-amz-credential") .get("x-amz-credential")
.ok_or(Error::BadRequest(format!( .ok_or_bad_request("X-Amz-Credential not found in query parameters")?;
"X-Amz-Credential not found in query parameters"
)))?;
let (key_id, scope) = parse_credential(cred)?; let (key_id, scope) = parse_credential(cred)?;
let signed_headers = headers let signed_headers = headers
.get("x-amz-signedheaders") .get("x-amz-signedheaders")
.ok_or(Error::BadRequest(format!( .ok_or_bad_request("X-Amz-SignedHeaders not found in query parameters")?;
"X-Amz-SignedHeaders not found in query parameters"
)))?;
let signature = headers let signature = headers
.get("x-amz-signature") .get("x-amz-signature")
.ok_or(Error::BadRequest(format!( .ok_or_bad_request("X-Amz-Signature not found in query parameters")?;
"X-Amz-Signature not found in query parameters"
)))?;
let content_sha256 = headers let content_sha256 = headers
.get("x-amz-content-sha256") .get("x-amz-content-sha256")
.map(|x| x.as_str()) .map(|x| x.as_str())
@ -226,9 +208,9 @@ fn parse_query_authorization(headers: &HashMap<String, String>) -> Result<Author
} }
fn parse_credential(cred: &str) -> Result<(String, String), Error> { fn parse_credential(cred: &str) -> Result<(String, String), Error> {
let first_slash = cred.find('/').ok_or(Error::BadRequest(format!( let first_slash = cred
"Credentials does not contain / in authorization field" .find('/')
)))?; .ok_or_bad_request("Credentials does not contain / in authorization field")?;
let (key_id, scope) = cred.split_at(first_slash); let (key_id, scope) = cred.split_at(first_slash);
Ok(( Ok((
key_id.to_string(), key_id.to_string(),

View file

@ -55,7 +55,7 @@ impl AdminRpcHandler {
AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
AdminRPC::KeyOperation(ko) => self2.handle_key_cmd(ko).await, AdminRPC::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await,
_ => Err(Error::BadRequest(format!("Invalid RPC"))), _ => Err(Error::BadRPC(format!("Invalid RPC"))),
} }
} }
}); });
@ -81,7 +81,7 @@ impl AdminRpcHandler {
BucketOperation::Create(query) => { BucketOperation::Create(query) => {
let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?;
if bucket.as_ref().filter(|b| !b.deleted).is_some() { if bucket.as_ref().filter(|b| !b.deleted).is_some() {
return Err(Error::BadRequest(format!( return Err(Error::BadRPC(format!(
"Bucket {} already exists", "Bucket {} already exists",
query.name query.name
))); )));
@ -104,13 +104,10 @@ impl AdminRpcHandler {
.get_range(&query.name, None, Some(()), 10) .get_range(&query.name, None, Some(()), 10)
.await?; .await?;
if !objects.is_empty() { if !objects.is_empty() {
return Err(Error::BadRequest(format!( return Err(Error::BadRPC(format!("Bucket {} is not empty", query.name)));
"Bucket {} is not empty",
query.name
)));
} }
if !query.yes { if !query.yes {
return Err(Error::BadRequest(format!( return Err(Error::BadRPC(format!(
"Add --yes flag to really perform this operation" "Add --yes flag to really perform this operation"
))); )));
} }
@ -199,7 +196,7 @@ impl AdminRpcHandler {
KeyOperation::Delete(query) => { KeyOperation::Delete(query) => {
let key = self.get_existing_key(&query.key_id).await?; let key = self.get_existing_key(&query.key_id).await?;
if !query.yes { if !query.yes {
return Err(Error::BadRequest(format!( return Err(Error::BadRPC(format!(
"Add --yes flag to really perform this operation" "Add --yes flag to really perform this operation"
))); )));
} }
@ -233,7 +230,7 @@ impl AdminRpcHandler {
.await? .await?
.filter(|b| !b.deleted) .filter(|b| !b.deleted)
.map(Ok) .map(Ok)
.unwrap_or(Err(Error::BadRequest(format!( .unwrap_or(Err(Error::BadRPC(format!(
"Bucket {} does not exist", "Bucket {} does not exist",
bucket bucket
)))) ))))
@ -246,7 +243,7 @@ impl AdminRpcHandler {
.await? .await?
.filter(|k| !k.deleted) .filter(|k| !k.deleted)
.map(Ok) .map(Ok)
.unwrap_or(Err(Error::BadRequest(format!("Key {} does not exist", id)))) .unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id))))
} }
async fn update_bucket_key( async fn update_bucket_key(
@ -306,7 +303,7 @@ impl AdminRpcHandler {
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRPC, Error> { async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRPC, Error> {
if !opt.yes { if !opt.yes {
return Err(Error::BadRequest(format!( return Err(Error::BadRPC(format!(
"Please provide the --yes flag to initiate repair operations." "Please provide the --yes flag to initiate repair operations."
))); )));
} }

View file

@ -123,7 +123,7 @@ impl BlockManager {
Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
Message::GetBlock(h) => self.read_block(h).await, Message::GetBlock(h) => self.read_block(h).await,
Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply),
_ => Err(Error::BadRequest(format!("Unexpected RPC message"))), _ => Err(Error::BadRPC(format!("Unexpected RPC message"))),
} }
} }

View file

@ -373,7 +373,7 @@ impl System {
Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await, Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await,
Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await, Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await,
_ => Err(Error::BadRequest(format!("Unexpected RPC message"))), _ => Err(Error::BadRPC(format!("Unexpected RPC message"))),
} }
} }
}); });

View file

@ -61,7 +61,10 @@ where
let err_str = format!("{}", e); let err_str = format!("{}", e);
let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?; let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?;
let mut err_response = Response::new(Body::from(rep_bytes)); let mut err_response = Response::new(Body::from(rep_bytes));
*err_response.status_mut() = e.http_status_code(); *err_response.status_mut() = match e {
Error::BadRPC(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
warn!( warn!(
"RPC error ({}): {} ({} ms)", "RPC error ({}): {} ({} ms)",
name, name,

View file

@ -329,7 +329,7 @@ where
.await?; .await?;
Ok(TableRPC::SyncRPC(response)) Ok(TableRPC::SyncRPC(response))
} }
_ => Err(Error::BadRequest(format!("Unexpected table RPC"))), _ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
} }
} }

View file

@ -474,7 +474,7 @@ where
todo.push_back(root_ck); todo.push_back(root_ck);
} }
} else { } else {
return Err(Error::BadRequest(format!( return Err(Error::Message(format!(
"Invalid respone to GetRootChecksumRange RPC: {}", "Invalid respone to GetRootChecksumRange RPC: {}",
debug_serialize(root_cks_resp) debug_serialize(root_cks_resp)
))); )));
@ -530,7 +530,7 @@ where
self.send_items(who, items_to_send).await?; self.send_items(who, items_to_send).await?;
} }
} else { } else {
return Err(Error::BadRequest(format!( return Err(Error::Message(format!(
"Unexpected response to sync RPC checksums: {}", "Unexpected response to sync RPC checksums: {}",
debug_serialize(&rpc_resp) debug_serialize(&rpc_resp)
))); )));

View file

@ -54,9 +54,6 @@ pub enum Error {
#[error(display = "TOML decode error: {}", _0)] #[error(display = "TOML decode error: {}", _0)]
TomlDecode(#[error(source)] toml::de::Error), TomlDecode(#[error(source)] toml::de::Error),
#[error(display = "Timeout: {}", _0)]
RPCTimeout(#[error(source)] tokio::time::Elapsed),
#[error(display = "Tokio join error: {}", _0)] #[error(display = "Tokio join error: {}", _0)]
TokioJoin(#[error(source)] tokio::task::JoinError), TokioJoin(#[error(source)] tokio::task::JoinError),
@ -66,14 +63,8 @@ pub enum Error {
#[error(display = "Remote error: {} (status code {})", _0, _1)] #[error(display = "Remote error: {} (status code {})", _0, _1)]
RemoteError(String, StatusCode), RemoteError(String, StatusCode),
#[error(display = "Bad request: {}", _0)] #[error(display = "Bad RPC: {}", _0)]
BadRequest(String), BadRPC(String),
#[error(display = "Forbidden: {}", _0)]
Forbidden(String),
#[error(display = "Not found")]
NotFound,
#[error(display = "Corrupt data: does not match hash {:?}", _0)] #[error(display = "Corrupt data: does not match hash {:?}", _0)]
CorruptData(Hash), CorruptData(Hash),
@ -82,18 +73,6 @@ pub enum Error {
Message(String), Message(String),
} }
impl Error {
pub fn http_status_code(&self) -> StatusCode {
match self {
Error::BadRequest(_) => StatusCode::BAD_REQUEST,
Error::NotFound => StatusCode::NOT_FOUND,
Error::Forbidden(_) => StatusCode::FORBIDDEN,
Error::RPC(_) => StatusCode::SERVICE_UNAVAILABLE,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
impl From<sled::TransactionError<Error>> for Error { impl From<sled::TransactionError<Error>> for Error {
fn from(e: sled::TransactionError<Error>) -> Error { fn from(e: sled::TransactionError<Error>) -> Error {
match e { match e {
@ -114,15 +93,3 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
Error::Message(format!("MPSC send error")) Error::Message(format!("MPSC send error"))
} }
} }
impl From<std::str::Utf8Error> for Error {
fn from(e: std::str::Utf8Error) -> Error {
Error::BadRequest(format!("Invalid UTF-8: {}", e))
}
}
impl From<roxmltree::Error> for Error {
fn from(e: roxmltree::Error) -> Error {
Error::BadRequest(format!("Invalid XML: {}", e))
}
}