diff --git a/src/api/helpers.rs b/src/api/helpers.rs index ba7b1599..5f488912 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,7 +1,12 @@ use std::convert::Infallible; +use futures::{Stream, StreamExt, TryStreamExt}; + use http_body_util::{BodyExt, Full as FullBody}; -use hyper::{body::Body, Request, Response}; +use hyper::{ + body::{Body, Bytes}, + Request, Response, +}; use idna::domain_to_unicode; use serde::{Deserialize, Serialize}; @@ -187,6 +192,22 @@ where .unwrap()) } +pub fn body_stream(body: B) -> impl Stream> +where + B: Body, + ::Error: Into, + E: From, +{ + let stream = http_body_util::BodyStream::new(body); + let stream = TryStreamExt::map_err(stream, Into::into); + stream.map(|x| { + x.and_then(|f| { + f.into_data() + .map_err(|_| E::from(Error::bad_request("non-data frame"))) + }) + }) +} + pub fn is_default(v: &T) -> bool { *v == T::default() } diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 4aa27eaf..b9d15b21 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -1,8 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use futures::{prelude::*, TryStreamExt}; -use http_body_util::BodyStream; +use futures::prelude::*; use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; @@ -89,10 +88,8 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let body_stream = BodyStream::new(req.into_body()) - .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap - .map_err(Error::from); - let mut chunker = StreamChunker::new(body_stream, garage.config.block_size); + let stream = body_stream(req.into_body()); + let mut chunker = StreamChunker::new(stream, garage.config.block_size); let ((_, _, mut mpu), first_block) = futures::try_join!( get_upload(&garage, &bucket_id, &key, &upload_id), diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index e9732dc4..bca8d6c6 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -7,8 +7,7 @@ use std::task::{Context, Poll}; use base64::prelude::*; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; -use futures::{Stream, StreamExt, TryStreamExt}; -use http_body_util::BodyStream; +use futures::{Stream, StreamExt}; use hyper::header::{self, HeaderMap, HeaderName, HeaderValue}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use multer::{Constraints, Multipart, SizeLimit}; @@ -45,10 +44,8 @@ pub async fn handle_post_object( ); let (head, body) = req.into_parts(); - let body_stream = BodyStream::new(body) - .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap - .map_err(Error::from); - let mut multipart = Multipart::with_constraints(body_stream, boundary, constraints); + let stream = body_stream::<_, Error>(body); + let mut multipart = Multipart::with_constraints(stream, boundary, constraints); let mut params = HeaderMap::new(); let field = loop { diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 3d43eee8..17424862 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -4,13 +4,13 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; use futures::try_join; -use http_body_util::BodyStream; -use hyper::body::Bytes; -use hyper::header::{HeaderMap, HeaderValue}; -use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; +use hyper::body::{Body, Bytes}; +use hyper::header::{HeaderMap, HeaderValue}; +use hyper::{Request, Response}; + use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, Context, @@ -51,14 +51,12 @@ pub async fn handle_put( None => None, }; - let body_stream = BodyStream::new(req.into_body()) - .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap - .map_err(Error::from); + let stream = body_stream(req.into_body()); save_stream( garage, headers, - body_stream, + stream, bucket, key, content_md5, diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index ea5a64e2..39147ca0 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -5,7 +5,7 @@ use futures::prelude::*; use futures::task; use garage_model::key_table::Key; use hmac::Mac; -use http_body_util::{BodyStream, StreamBody}; +use http_body_util::StreamBody; use hyper::body::{Bytes, Incoming as IncomingBody}; use hyper::Request; @@ -51,11 +51,9 @@ pub fn parse_streaming_body( .ok_or_internal_error("Unable to build signing HMAC")?; Ok(req.map(move |body| { - let body_stream = BodyStream::new(body) - .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap - .map_err(Error::from); + let stream = body_stream::<_, Error>(body); let signed_payload_stream = - SignedPayloadStream::new(body_stream, signing_hmac, date, &scope, signature) + SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature) .map(|x| x.map(hyper::body::Frame::data)) .map_err(Error::from); ReqBody::new(StreamBody::new(signed_payload_stream))