Handle streaming payload early in request handling #247
3 changed files with 61 additions and 57 deletions
|
@ -1,7 +1,9 @@
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
|
use futures::prelude::*;
|
||||||
use hyper::header;
|
use hyper::header;
|
||||||
use hyper::server::conn::AddrStream;
|
use hyper::server::conn::AddrStream;
|
||||||
use hyper::service::{make_service_fn, service_fn};
|
use hyper::service::{make_service_fn, service_fn};
|
||||||
|
@ -24,7 +26,10 @@ use garage_model::key_table::Key;
|
||||||
use garage_table::util::*;
|
use garage_table::util::*;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
|
use crate::signature::compute_scope;
|
||||||
use crate::signature::payload::check_payload_signature;
|
use crate::signature::payload::check_payload_signature;
|
||||||
|
use crate::signature::streaming::SignedPayloadStream;
|
||||||
|
use crate::signature::LONG_DATETIME;
|
||||||
|
|
||||||
use crate::helpers::*;
|
use crate::helpers::*;
|
||||||
use crate::s3_bucket::*;
|
use crate::s3_bucket::*;
|
||||||
|
@ -158,7 +163,7 @@ async fn handler_stage2(
|
||||||
let authority = req
|
let authority = req
|
||||||
.headers()
|
.headers()
|
||||||
.get(header::HOST)
|
.get(header::HOST)
|
||||||
.ok_or_else(|| Error::BadRequest("HOST header required".to_owned()))?
|
.ok_or_bad_request("Host header required")?
|
||||||
.to_str()?;
|
.to_str()?;
|
||||||
|
|
||||||
let host = authority_to_host(authority)?;
|
let host = authority_to_host(authority)?;
|
||||||
|
@ -221,11 +226,57 @@ async fn handler_stage3(
|
||||||
return handle_options_s3api(garage, &req, bucket_name).await;
|
return handle_options_s3api(garage, &req, bucket_name).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?;
|
let (api_key, mut content_sha256) = check_payload_signature(&garage, &req).await?;
|
||||||
let api_key = api_key.ok_or_else(|| {
|
let api_key = api_key.ok_or_else(|| {
|
||||||
Error::Forbidden("Garage does not support anonymous access yet".to_string())
|
Error::Forbidden("Garage does not support anonymous access yet".to_string())
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
let req = match req.headers().get("x-amz-content-sha256") {
|
||||||
|
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
|
||||||
|
let signature = content_sha256
|
||||||
|
.take()
|
||||||
|
.ok_or_bad_request("No signature provided")?;
|
||||||
|
|
||||||
|
let secret_key = &api_key
|
||||||
|
.state
|
||||||
|
.as_option()
|
||||||
|
.ok_or_internal_error("Deleted key state")?
|
||||||
|
.secret_key;
|
||||||
|
|
||||||
|
let date = req
|
||||||
|
.headers()
|
||||||
|
.get("x-amz-date")
|
||||||
|
.ok_or_bad_request("Missing X-Amz-Date field")?
|
||||||
|
.to_str()?;
|
||||||
|
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
|
||||||
|
.ok_or_bad_request("Invalid date")?;
|
||||||
|
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
|
||||||
|
|
||||||
|
let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
|
||||||
|
let signing_hmac = crate::signature::signing_hmac(
|
||||||
|
&date,
|
||||||
|
secret_key,
|
||||||
|
&garage.config.s3_api.s3_region,
|
||||||
|
"s3",
|
||||||
|
)
|
||||||
|
.ok_or_internal_error("Unable to build signing HMAC")?;
|
||||||
|
|
||||||
|
req.map(move |body| {
|
||||||
|
Body::wrap_stream(
|
||||||
|
SignedPayloadStream::new(
|
||||||
|
body.map_err(Error::from),
|
||||||
|
signing_hmac,
|
||||||
|
date,
|
||||||
|
&scope,
|
||||||
|
signature,
|
||||||
|
)
|
||||||
|
.map_err(Error::from),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
_ => req,
|
||||||
|
};
|
||||||
|
|
||||||
let bucket_name = match bucket_name {
|
let bucket_name = match bucket_name {
|
||||||
None => return handle_request_without_bucket(garage, req, api_key, endpoint).await,
|
None => return handle_request_without_bucket(garage, req, api_key, endpoint).await,
|
||||||
Some(bucket) => bucket.to_string(),
|
Some(bucket) => bucket.to_string(),
|
||||||
|
@ -307,7 +358,7 @@ async fn handler_stage3(
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
Endpoint::PutObject { key } => {
|
Endpoint::PutObject { key } => {
|
||||||
handle_put(garage, req, bucket_id, &key, &api_key, content_sha256).await
|
handle_put(garage, req, bucket_id, &key, content_sha256).await
|
||||||
}
|
}
|
||||||
Endpoint::AbortMultipartUpload { key, upload_id } => {
|
Endpoint::AbortMultipartUpload { key, upload_id } => {
|
||||||
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
|
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
|
||||||
|
|
|
@ -1,8 +1,7 @@
|
||||||
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
use futures::prelude::*;
|
||||||
use futures::{prelude::*, TryFutureExt};
|
|
||||||
use hyper::body::{Body, Bytes};
|
use hyper::body::{Body, Bytes};
|
||||||
use hyper::header::{HeaderMap, HeaderValue};
|
use hyper::header::{HeaderMap, HeaderValue};
|
||||||
use hyper::{Request, Response};
|
use hyper::{Request, Response};
|
||||||
|
@ -17,23 +16,19 @@ use garage_util::time::*;
|
||||||
use garage_model::block::INLINE_THRESHOLD;
|
use garage_model::block::INLINE_THRESHOLD;
|
||||||
use garage_model::block_ref_table::*;
|
use garage_model::block_ref_table::*;
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::key_table::Key;
|
|
||||||
use garage_model::object_table::*;
|
use garage_model::object_table::*;
|
||||||
use garage_model::version_table::*;
|
use garage_model::version_table::*;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::s3_xml;
|
use crate::s3_xml;
|
||||||
use crate::signature::streaming::SignedPayloadStream;
|
use crate::signature::verify_signed_content;
|
||||||
use crate::signature::LONG_DATETIME;
|
|
||||||
use crate::signature::{compute_scope, verify_signed_content};
|
|
||||||
|
|
||||||
pub async fn handle_put(
|
pub async fn handle_put(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
bucket_id: Uuid,
|
bucket_id: Uuid,
|
||||||
key: &str,
|
key: &str,
|
||||||
api_key: &Key,
|
content_sha256: Option<Hash>,
|
||||||
mut content_sha256: Option<Hash>,
|
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
// Retrieve interesting headers from request
|
// Retrieve interesting headers from request
|
||||||
let headers = get_headers(req.headers())?;
|
let headers = get_headers(req.headers())?;
|
||||||
|
@ -43,52 +38,10 @@ pub async fn handle_put(
|
||||||
Some(x) => Some(x.to_str()?.to_string()),
|
Some(x) => Some(x.to_str()?.to_string()),
|
||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
let payload_seed_signature = match req.headers().get("x-amz-content-sha256") {
|
|
||||||
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
|
|
||||||
let content_sha256 = content_sha256
|
|
||||||
.take()
|
|
||||||
.ok_or_bad_request("No signature provided")?;
|
|
||||||
Some(content_sha256)
|
|
||||||
}
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Parse body of uploaded file
|
let (_head, body) = req.into_parts();
|
||||||
let (head, body) = req.into_parts();
|
|
||||||
let body = body.map_err(Error::from);
|
let body = body.map_err(Error::from);
|
||||||
|
|
||||||
let body = if let Some(signature) = payload_seed_signature {
|
|
||||||
let secret_key = &api_key
|
|
||||||
.state
|
|
||||||
.as_option()
|
|
||||||
.ok_or_internal_error("Deleted key state")?
|
|
||||||
.secret_key;
|
|
||||||
|
|
||||||
let date = head
|
|
||||||
.headers
|
|
||||||
.get("x-amz-date")
|
|
||||||
.ok_or_bad_request("Missing X-Amz-Date field")?
|
|
||||||
.to_str()?;
|
|
||||||
let date: NaiveDateTime =
|
|
||||||
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
|
|
||||||
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
|
|
||||||
|
|
||||||
let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
|
|
||||||
let signing_hmac = crate::signature::signing_hmac(
|
|
||||||
&date,
|
|
||||||
secret_key,
|
|
||||||
&garage.config.s3_api.s3_region,
|
|
||||||
"s3",
|
|
||||||
)
|
|
||||||
.ok_or_internal_error("Unable to build signing HMAC")?;
|
|
||||||
|
|
||||||
SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)?
|
|
||||||
.map_err(Error::from)
|
|
||||||
.boxed()
|
|
||||||
} else {
|
|
||||||
body.boxed()
|
|
||||||
};
|
|
||||||
|
|
||||||
save_stream(
|
save_stream(
|
||||||
garage,
|
garage,
|
||||||
headers,
|
headers,
|
||||||
|
|
|
@ -164,15 +164,15 @@ where
|
||||||
datetime: DateTime<Utc>,
|
datetime: DateTime<Utc>,
|
||||||
scope: &str,
|
scope: &str,
|
||||||
seed_signature: Hash,
|
seed_signature: Hash,
|
||||||
) -> Result<Self, Error> {
|
) -> Self {
|
||||||
Ok(Self {
|
Self {
|
||||||
stream,
|
stream,
|
||||||
buf: bytes::BytesMut::new(),
|
buf: bytes::BytesMut::new(),
|
||||||
datetime,
|
datetime,
|
||||||
scope: scope.into(),
|
scope: scope.into(),
|
||||||
signing_hmac,
|
signing_hmac,
|
||||||
previous_signature: seed_signature,
|
previous_signature: seed_signature,
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> {
|
fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> {
|
||||||
|
|
Loading…
Reference in a new issue