api: start refactor of signature to calculate checksums earlier

This commit is contained in:
Alex 2025-02-17 18:47:06 +01:00
parent a04d6cd5b8
commit c5df820e2c
21 changed files with 288 additions and 231 deletions

View file

@ -14,9 +14,9 @@ use crate::common_error::{
};
use crate::helpers::*;
pub fn find_matching_cors_rule<'a>(
pub fn find_matching_cors_rule<'a, B>(
bucket_params: &'a BucketParams,
req: &Request<impl Body>,
req: &Request<B>,
) -> Result<Option<&'a GarageCorsRule>, CommonError> {
if let Some(cors_config) = bucket_params.cors_config.get() {
if let Some(origin) = req.headers().get("Origin") {
@ -132,8 +132,8 @@ pub async fn handle_options_api(
}
}
pub fn handle_options_for_bucket(
req: &Request<IncomingBody>,
pub fn handle_options_for_bucket<B>(
req: &Request<B>,
bucket_params: &BucketParams,
) -> Result<Response<EmptyBody>, CommonError> {
let origin = req

View file

@ -0,0 +1,69 @@
use std::sync::Mutex;
use futures::prelude::*;
use futures::stream::BoxStream;
use http_body_util::{BodyExt, StreamBody};
use hyper::body::{Bytes, Frame};
use serde::Deserialize;
use tokio::sync::{mpsc, oneshot};
use super::*;
use crate::signature::checksum::*;
pub struct ReqBody {
// why need mutex to be sync??
pub stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>,
pub checksummer: Checksummer,
pub expected_checksums: ExpectedChecksums,
}
pub type StreamingChecksumReceiver = oneshot::Receiver<Result<Checksums, Error>>;
impl ReqBody {
pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> {
let body = self.collect().await?;
let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
Ok(resp)
}
pub async fn collect(self) -> Result<Bytes, Error> {
self.collect_with_checksums().await.map(|(b, _)| b)
}
pub async fn collect_with_checksums(mut self) -> Result<(Bytes, Checksums), Error> {
let stream: BoxStream<_> = self.stream.into_inner().unwrap();
let bytes = BodyExt::collect(StreamBody::new(stream)).await?.to_bytes();
self.checksummer.update(&bytes);
let checksums = self.checksummer.finalize();
checksums.verify(&self.expected_checksums)?;
Ok((bytes, checksums))
}
pub fn streaming(self) -> impl Stream<Item = Result<Bytes, Error>> {
self.streaming_with_checksums(false).0
}
pub fn streaming_with_checksums(
self,
add_md5: bool,
) -> (
impl Stream<Item = Result<Bytes, Error>>,
StreamingChecksumReceiver,
) {
let (tx, rx) = oneshot::channel();
// TODO: actually calculate checksums!!
let stream: BoxStream<_> = self.stream.into_inner().unwrap();
(
stream.map(|x| {
x.and_then(|f| {
f.into_data()
.map_err(|_| Error::bad_request("non-data frame"))
})
}),
rx,
)
}
}

View file

@ -8,13 +8,15 @@ use md5::{Digest, Md5};
use sha1::Sha1;
use sha2::Sha256;
use http::HeaderName;
use http::{HeaderMap, HeaderName, HeaderValue};
use garage_util::data::*;
use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue};
use super::error::*;
use super::*;
pub const CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-checksum-algorithm");
@ -58,14 +60,18 @@ pub struct Checksums {
}
impl Checksummer {
pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self {
let mut ret = Self {
pub fn new() -> Self {
Self {
crc32: None,
crc32c: None,
md5: None,
sha1: None,
sha256: None,
};
}
}
pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self {
let mut ret = Self::new();
if expected.md5.is_some() || require_md5 {
ret.md5 = Some(Md5::new());
@ -179,3 +185,122 @@ impl Checksums {
}
}
}
// ----
/// Extract the value of the x-amz-checksum-algorithm header
pub fn request_checksum_algorithm(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumAlgorithm>, Error> {
match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
None => Ok(None),
Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)),
Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)),
Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)),
Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)),
_ => Err(Error::bad_request("invalid checksum algorithm")),
}
}
pub fn request_trailer_checksum_algorithm(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumAlgorithm>, Error> {
match headers.get(X_AMZ_TRAILER).map(|x| x.to_str()).transpose()? {
None => Ok(None),
Some(x) if x == X_AMZ_CHECKSUM_CRC32 => Ok(Some(ChecksumAlgorithm::Crc32)),
Some(x) if x == X_AMZ_CHECKSUM_CRC32C => Ok(Some(ChecksumAlgorithm::Crc32c)),
Some(x) if x == X_AMZ_CHECKSUM_SHA1 => Ok(Some(ChecksumAlgorithm::Sha1)),
Some(x) if x == X_AMZ_CHECKSUM_SHA256 => Ok(Some(ChecksumAlgorithm::Sha256)),
_ => Err(Error::bad_request("invalid checksum algorithm")),
}
}
/// Extract the value of any of the x-amz-checksum-* headers
pub fn request_checksum_value(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumValue>, Error> {
let mut ret = vec![];
if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) {
let crc32 = BASE64_STANDARD
.decode(&crc32_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
ret.push(ChecksumValue::Crc32(crc32))
}
if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) {
let crc32c = BASE64_STANDARD
.decode(&crc32c_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
ret.push(ChecksumValue::Crc32c(crc32c))
}
if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) {
let sha1 = BASE64_STANDARD
.decode(&sha1_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
ret.push(ChecksumValue::Sha1(sha1))
}
if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) {
let sha256 = BASE64_STANDARD
.decode(&sha256_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
ret.push(ChecksumValue::Sha256(sha256))
}
if ret.len() > 1 {
return Err(Error::bad_request(
"multiple x-amz-checksum-* headers given",
));
}
Ok(ret.pop())
}
/// Checks for the presence of x-amz-checksum-algorithm
/// if so extract the corresponding x-amz-checksum-* value
pub fn request_checksum_algorithm_value(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumValue>, Error> {
match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
Some(x) if x == "CRC32" => {
let crc32 = headers
.get(X_AMZ_CHECKSUM_CRC32)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
Ok(Some(ChecksumValue::Crc32(crc32)))
}
Some(x) if x == "CRC32C" => {
let crc32c = headers
.get(X_AMZ_CHECKSUM_CRC32C)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
Ok(Some(ChecksumValue::Crc32c(crc32c)))
}
Some(x) if x == "SHA1" => {
let sha1 = headers
.get(X_AMZ_CHECKSUM_SHA1)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
Ok(Some(ChecksumValue::Sha1(sha1)))
}
Some(x) if x == "SHA256" => {
let sha256 = headers
.get(X_AMZ_CHECKSUM_SHA256)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
Ok(Some(ChecksumValue::Sha256(sha256)))
}
Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")),
None => Ok(None),
}
}

View file

@ -11,6 +11,7 @@ use garage_util::data::{sha256sum, Hash};
use error::*;
pub mod body;
pub mod checksum;
pub mod error;
pub mod payload;
@ -51,7 +52,7 @@ pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD";
#[derive(Debug)]
pub enum ContentSha256Header {
UnsignedPayload,
Sha256Hash(Hash),
Sha256Checksum(Hash),
StreamingPayload { trailer: bool, signed: bool },
}
@ -90,15 +91,6 @@ pub async fn verify_request(
})
}
pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
if expected_sha256 != sha256sum(body) {
return Err(Error::bad_request(
"Request content hash does not match signed hash".to_string(),
));
}
Ok(())
}
pub fn signing_hmac(
datetime: &DateTime<Utc>,
secret_key: &str,

View file

@ -94,7 +94,7 @@ fn parse_x_amz_content_sha256(header: Option<&str>) -> Result<ContentSha256Heade
.ok()
.and_then(|bytes| Hash::try_from(&bytes))
.ok_or_bad_request("Invalid content sha256 hash")?;
Ok(ContentSha256Header::Sha256Hash(sha256))
Ok(ContentSha256Header::Sha256Checksum(sha256))
}
}

View file

@ -1,21 +1,22 @@
use std::pin::Pin;
use std::sync::Mutex;
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures::prelude::*;
use futures::task;
use hmac::Mac;
use http_body_util::StreamBody;
use hyper::body::{Bytes, Incoming as IncomingBody};
use hyper::body::{Bytes, Frame, Incoming as IncomingBody};
use hyper::Request;
use garage_util::data::Hash;
use super::*;
use crate::helpers::*;
use crate::helpers::body_stream;
use crate::signature::checksum::*;
use crate::signature::payload::CheckedSignature;
pub type ReqBody = BoxBody<Error>;
pub use crate::signature::body::ReqBody;
pub fn parse_streaming_body(
req: Request<IncomingBody>,
@ -23,6 +24,20 @@ pub fn parse_streaming_body(
region: &str,
service: &str,
) -> Result<Request<ReqBody>, Error> {
let expected_checksums = ExpectedChecksums {
md5: match req.headers().get("content-md5") {
Some(x) => Some(x.to_str()?.to_string()),
None => None,
},
sha256: match &checked_signature.content_sha256_header {
ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256),
_ => None,
},
extra: None,
};
let mut checksummer = Checksummer::init(&expected_checksums, false);
match checked_signature.content_sha256_header {
ContentSha256Header::StreamingPayload { signed, trailer } => {
if !signed && !trailer {
@ -31,6 +46,11 @@ pub fn parse_streaming_body(
));
}
if trailer {
let algo = request_trailer_checksum_algorithm(req.headers())?;
checksummer = checksummer.add(algo);
}
let sign_params = if signed {
let signature = checked_signature
.signature_header
@ -77,14 +97,24 @@ pub fn parse_streaming_body(
Ok(req.map(move |body| {
let stream = body_stream::<_, Error>(body);
let signed_payload_stream =
StreamingPayloadStream::new(stream, sign_params, trailer)
.map(|x| x.map(hyper::body::Frame::data))
.map_err(Error::from);
ReqBody::new(StreamBody::new(signed_payload_stream))
StreamingPayloadStream::new(stream, sign_params, trailer).map_err(Error::from);
ReqBody {
stream: Mutex::new(signed_payload_stream.boxed()),
checksummer,
expected_checksums,
}
}))
}
_ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))),
_ => Ok(req.map(|body| {
let stream = http_body_util::BodyStream::new(body).map_err(Error::from);
ReqBody {
stream: Mutex::new(stream.boxed()),
checksummer,
expected_checksums,
}
})),
}
}
@ -386,7 +416,7 @@ impl<S> Stream for StreamingPayloadStream<S>
where
S: Stream<Item = Result<Bytes, Error>> + Unpin,
{
type Item = Result<Bytes, StreamingPayloadError>;
type Item = Result<Frame<Bytes>, StreamingPayloadError>;
fn poll_next(
self: Pin<&mut Self>,
@ -450,7 +480,7 @@ where
return Poll::Ready(None);
}
return Poll::Ready(Some(Ok(data)));
return Poll::Ready(Some(Ok(Frame::data(data))));
}
StreamingPayloadChunk::Trailer(trailer) => {
if let Some(signing) = this.signing.as_mut() {

View file

@ -20,7 +20,7 @@ pub async fn handle_insert_batch(
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
let items = req.into_body().json::<Vec<InsertBatchItem>>().await?;
let mut items2 = vec![];
for it in items {
@ -47,7 +47,7 @@ pub async fn handle_read_batch(
ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
let queries = req.into_body().json::<Vec<ReadBatchQuery>>().await?;
let resp_results = futures::future::join_all(
queries
@ -141,7 +141,7 @@ pub async fn handle_delete_batch(
ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
let queries = req.into_body().json::<Vec<DeleteBatchQuery>>().await?;
let resp_results = futures::future::join_all(
queries
@ -262,7 +262,7 @@ pub(crate) async fn handle_poll_range(
} = ctx;
use garage_model::k2v::sub::PollRange;
let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
let query = req.into_body().json::<PollRangeQuery>().await?;
let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;

View file

@ -144,9 +144,7 @@ pub async fn handle_insert_item(
.map(parse_causality_token)
.transpose()?;
let body = http_body_util::BodyExt::collect(req.into_body())
.await?
.to_bytes();
let body = req.into_body().collect().await?;
let value = DvvsValue::Value(body.to_vec());

View file

@ -125,7 +125,7 @@ impl ApiHandler for S3ApiServer {
let req = verified_request.request;
let api_key = verified_request.access_key;
let content_sha256 = match verified_request.content_sha256_header {
ContentSha256Header::Sha256Hash(h) => Some(h),
ContentSha256Header::Sha256Checksum(h) => Some(h),
// TODO take into account streaming/trailer checksums, etc.
_ => None,
};
@ -141,14 +141,7 @@ impl ApiHandler for S3ApiServer {
// Special code path for CreateBucket API endpoint
if let Endpoint::CreateBucket {} = endpoint {
return handle_create_bucket(
&garage,
req,
content_sha256,
&api_key.key_id,
bucket_name,
)
.await;
return handle_create_bucket(&garage, req, &api_key.key_id, bucket_name).await;
}
let bucket_id = garage
@ -186,7 +179,7 @@ impl ApiHandler for S3ApiServer {
let resp = match endpoint {
Endpoint::HeadObject {
key, part_number, ..
} => handle_head(ctx, &req, &key, part_number).await,
} => handle_head(ctx, &req.map(|_| ()), &key, part_number).await,
Endpoint::GetObject {
key,
part_number,
@ -206,7 +199,7 @@ impl ApiHandler for S3ApiServer {
response_content_type,
response_expires,
};
handle_get(ctx, &req, &key, part_number, overrides).await
handle_get(ctx, &req.map(|_| ()), &key, part_number, overrides).await
}
Endpoint::UploadPart {
key,
@ -228,7 +221,7 @@ impl ApiHandler for S3ApiServer {
handle_create_multipart_upload(ctx, &req, &key).await
}
Endpoint::CompleteMultipartUpload { key, upload_id } => {
handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await
handle_complete_multipart_upload(ctx, req, &key, &upload_id).await
}
Endpoint::CreateBucket {} => unreachable!(),
Endpoint::HeadBucket {} => {
@ -331,17 +324,15 @@ impl ApiHandler for S3ApiServer {
};
handle_list_parts(ctx, req, &query).await
}
Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await,
Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req).await,
Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await,
Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await,
Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req).await,
Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await,
Endpoint::GetBucketCors {} => handle_get_cors(ctx).await,
Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await,
Endpoint::PutBucketCors {} => handle_put_cors(ctx, req).await,
Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await,
Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await,
Endpoint::PutBucketLifecycleConfiguration {} => {
handle_put_lifecycle(ctx, req, content_sha256).await
}
Endpoint::PutBucketLifecycleConfiguration {} => handle_put_lifecycle(ctx, req).await,
Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await,
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
};

View file

@ -1,6 +1,5 @@
use std::collections::HashMap;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use garage_model::bucket_alias_table::*;
@ -10,12 +9,10 @@ use garage_model::key_table::Key;
use garage_model::permission::BucketKeyPerm;
use garage_table::util::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::time::*;
use garage_api_common::common_error::CommonError;
use garage_api_common::helpers::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@ -122,15 +119,10 @@ pub async fn handle_list_buckets(
pub async fn handle_create_bucket(
garage: &Garage,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
api_key_id: &String,
bucket_name: String,
) -> Result<Response<ResBody>, Error> {
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let body = req.into_body().collect().await?;
let cmd =
parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?;

View file

@ -8,8 +8,6 @@ use md5::{Digest, Md5};
use sha1::Sha1;
use sha2::Sha256;
use http::{HeaderMap, HeaderValue};
use garage_util::error::OkOrMessage;
use garage_model::s3::object_table::*;
@ -112,112 +110,6 @@ impl MultipartChecksummer {
}
}
// ----
/// Extract the value of the x-amz-checksum-algorithm header
pub(crate) fn request_checksum_algorithm(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumAlgorithm>, Error> {
match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
None => Ok(None),
Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)),
Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)),
Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)),
Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)),
_ => Err(Error::bad_request("invalid checksum algorithm")),
}
}
/// Extract the value of any of the x-amz-checksum-* headers
pub(crate) fn request_checksum_value(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumValue>, Error> {
let mut ret = vec![];
if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) {
let crc32 = BASE64_STANDARD
.decode(&crc32_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
ret.push(ChecksumValue::Crc32(crc32))
}
if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) {
let crc32c = BASE64_STANDARD
.decode(&crc32c_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
ret.push(ChecksumValue::Crc32c(crc32c))
}
if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) {
let sha1 = BASE64_STANDARD
.decode(&sha1_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
ret.push(ChecksumValue::Sha1(sha1))
}
if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) {
let sha256 = BASE64_STANDARD
.decode(&sha256_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
ret.push(ChecksumValue::Sha256(sha256))
}
if ret.len() > 1 {
return Err(Error::bad_request(
"multiple x-amz-checksum-* headers given",
));
}
Ok(ret.pop())
}
/// Checks for the presence of x-amz-checksum-algorithm
/// if so extract the corresponding x-amz-checksum-* value
pub(crate) fn request_checksum_algorithm_value(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumValue>, Error> {
match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
Some(x) if x == "CRC32" => {
let crc32 = headers
.get(X_AMZ_CHECKSUM_CRC32)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
Ok(Some(ChecksumValue::Crc32(crc32)))
}
Some(x) if x == "CRC32C" => {
let crc32c = headers
.get(X_AMZ_CHECKSUM_CRC32C)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
Ok(Some(ChecksumValue::Crc32c(crc32c)))
}
Some(x) if x == "SHA1" => {
let sha1 = headers
.get(X_AMZ_CHECKSUM_SHA1)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
Ok(Some(ChecksumValue::Sha1(sha1)))
}
Some(x) if x == "SHA256" => {
let sha256 = headers
.get(X_AMZ_CHECKSUM_SHA256)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
Ok(Some(ChecksumValue::Sha256(sha256)))
}
Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")),
None => Ok(None),
}
}
pub(crate) fn add_checksum_response_headers(
checksum: &Option<ChecksumValue>,
mut resp: http::response::Builder,

View file

@ -24,7 +24,6 @@ use garage_api_common::helpers::*;
use garage_api_common::signature::checksum::*;
use crate::api_server::{ReqBody, ResBody};
use crate::checksum::*;
use crate::encryption::EncryptionParams;
use crate::error::*;
use crate::get::full_object_byte_stream;

View file

@ -2,15 +2,11 @@ use quick_xml::de::from_reader;
use hyper::{header::HeaderName, Method, Request, Response, StatusCode};
use http_body_util::BodyExt;
use serde::{Deserialize, Serialize};
use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
use garage_util::data::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@ -59,7 +55,6 @@ pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error>
pub async fn handle_put_cors(
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@ -68,11 +63,7 @@ pub async fn handle_put_cors(
..
} = ctx;
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let body = req.into_body().collect().await?;
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;

View file

@ -1,4 +1,3 @@
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
@ -6,7 +5,6 @@ use garage_util::data::*;
use garage_model::s3::object_table::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@ -68,13 +66,8 @@ pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>,
pub async fn handle_delete_objects(
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let body = req.into_body().collect().await?;
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;

View file

@ -12,7 +12,7 @@ use http::header::{
CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH,
LAST_MODIFIED, RANGE,
};
use hyper::{body::Body, Request, Response, StatusCode};
use hyper::{Request, Response, StatusCode};
use tokio::sync::mpsc;
use garage_net::stream::ByteStream;
@ -119,7 +119,7 @@ fn getobject_override_headers(
fn try_answer_cached(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
req: &Request<impl Body>,
req: &Request<()>,
) -> Option<Response<ResBody>> {
// <trinity> It is possible, and is even usually the case, [that both If-None-Match and
// If-Modified-Since] are present in a request. In this situation If-None-Match takes
@ -158,7 +158,7 @@ fn try_answer_cached(
/// Handle HEAD request
pub async fn handle_head(
ctx: ReqCtx,
req: &Request<impl Body>,
req: &Request<()>,
key: &str,
part_number: Option<u64>,
) -> Result<Response<ResBody>, Error> {
@ -168,7 +168,7 @@ pub async fn handle_head(
/// Handle HEAD request for website
pub async fn handle_head_without_ctx(
garage: Arc<Garage>,
req: &Request<impl Body>,
req: &Request<()>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
@ -279,7 +279,7 @@ pub async fn handle_head_without_ctx(
/// Handle GET request
pub async fn handle_get(
ctx: ReqCtx,
req: &Request<impl Body>,
req: &Request<()>,
key: &str,
part_number: Option<u64>,
overrides: GetObjectOverrides,
@ -290,7 +290,7 @@ pub async fn handle_get(
/// Handle GET request
pub async fn handle_get_without_ctx(
garage: Arc<Garage>,
req: &Request<impl Body>,
req: &Request<()>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
@ -578,7 +578,7 @@ async fn handle_get_part(
}
fn parse_range_header(
req: &Request<impl Body>,
req: &Request<()>,
total_size: u64,
) -> Result<Option<http_range::HttpRange>, Error> {
let range = match req.headers().get(RANGE) {
@ -619,7 +619,7 @@ struct ChecksumMode {
enabled: bool,
}
fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode {
fn checksum_mode(req: &Request<()>) -> ChecksumMode {
ChecksumMode {
enabled: req
.headers()

View file

@ -1,12 +1,10 @@
use quick_xml::de::from_reader;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_api_common::helpers::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@ -16,7 +14,6 @@ use garage_model::bucket_table::{
parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration,
LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule,
};
use garage_util::data::*;
pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { bucket_params, .. } = ctx;
@ -56,7 +53,6 @@ pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, E
pub async fn handle_put_lifecycle(
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@ -65,11 +61,7 @@ pub async fn handle_put_lifecycle(
..
} = ctx;
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let body = req.into_body().collect().await?;
let conf: LifecycleConfiguration = from_reader(&body as &[u8])?;
let config = conf

View file

@ -17,7 +17,6 @@ use garage_model::s3::version_table::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::checksum::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::checksum::*;
@ -114,7 +113,11 @@ pub async fn handle_put_part(
let key = key.to_string();
let (req_head, req_body) = req.into_parts();
let stream = body_stream(req_body);
let (stream, checksums) = req_body.streaming_with_checksums(true);
let stream = stream.map_err(Error::from);
// TODO checksums
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
let ((_, object_version, mut mpu), first_block) =
@ -249,7 +252,6 @@ pub async fn handle_complete_multipart_upload(
req: Request<ReqBody>,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@ -261,11 +263,7 @@ pub async fn handle_complete_multipart_upload(
let expected_checksum = request_checksum_value(&req_head.headers)?;
let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let body = req_body.collect().await?;
let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml)

View file

@ -22,7 +22,6 @@ use garage_api_common::signature::checksum::*;
use garage_api_common::signature::payload::{verify_v4, Authorization};
use crate::api_server::ResBody;
use crate::checksum::*;
use crate::encryption::EncryptionParams;
use crate::error::*;
use crate::put::{get_headers, save_stream, ChecksumMode};

View file

@ -79,7 +79,9 @@ pub async fn handle_put(
// Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
let stream = body_stream(req.into_body());
let (stream, checksums) = req.into_body().streaming_with_checksums(true);
let stream = stream.map_err(Error::from);
// TODO checksums
let res = save_stream(
&ctx,

View file

@ -1,14 +1,11 @@
use quick_xml::de::from_reader;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_model::bucket_table::*;
use garage_util::data::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@ -61,7 +58,6 @@ pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Err
pub async fn handle_put_website(
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@ -70,11 +66,7 @@ pub async fn handle_put_website(
..
} = ctx;
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let body = req.into_body().collect().await?;
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;

View file

@ -1,6 +1,6 @@
use std::fs::{self, Permissions};
use std::os::unix::prelude::PermissionsExt;
use std::{convert::Infallible, sync::Arc};
use std::sync::Arc;
use tokio::net::{TcpListener, UnixListener};
use tokio::sync::watch;
@ -163,6 +163,8 @@ impl WebServer {
metrics_tags.push(KeyValue::new("host", host_header.clone()));
}
let req = req.map(|_| ());
// The actual handler
let res = self
.serve_file(&req)
@ -218,7 +220,7 @@ impl WebServer {
async fn serve_file(
self: &Arc<Self>,
req: &Request<IncomingBody>,
req: &Request<()>,
) -> Result<Response<BoxBody<ApiError>>, Error> {
// Get http authority string (eg. [::1]:3902 or garage.tld:80)
let authority = req
@ -322,7 +324,7 @@ impl WebServer {
// Create a fake HTTP request with path = the error document
let req2 = Request::builder()
.uri(format!("http://{}/{}", host, &error_document))
.body(empty_body::<Infallible>())
.body(())
.unwrap();
match handle_get_without_ctx(