[dep-upgrade-202402] refactor use of BodyStream

This commit is contained in:
Alex 2024-02-07 15:25:49 +01:00
parent 53746b59e5
commit e011941964
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
5 changed files with 37 additions and 26 deletions

View file

@ -1,7 +1,12 @@
use std::convert::Infallible; use std::convert::Infallible;
use futures::{Stream, StreamExt, TryStreamExt};
use http_body_util::{BodyExt, Full as FullBody}; use http_body_util::{BodyExt, Full as FullBody};
use hyper::{body::Body, Request, Response}; use hyper::{
body::{Body, Bytes},
Request, Response,
};
use idna::domain_to_unicode; use idna::domain_to_unicode;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -187,6 +192,22 @@ where
.unwrap()) .unwrap())
} }
pub fn body_stream<B, E>(body: B) -> impl Stream<Item = Result<Bytes, E>>
where
B: Body<Data = Bytes>,
<B as Body>::Error: Into<E>,
E: From<Error>,
{
let stream = http_body_util::BodyStream::new(body);
let stream = TryStreamExt::map_err(stream, Into::into);
stream.map(|x| {
x.and_then(|f| {
f.into_data()
.map_err(|_| E::from(Error::bad_request("non-data frame")))
})
})
}
pub fn is_default<T: Default + PartialEq>(v: &T) -> bool { pub fn is_default<T: Default + PartialEq>(v: &T) -> bool {
*v == T::default() *v == T::default()
} }

View file

@ -1,8 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use futures::{prelude::*, TryStreamExt}; use futures::prelude::*;
use http_body_util::BodyStream;
use hyper::{Request, Response}; use hyper::{Request, Response};
use md5::{Digest as Md5Digest, Md5}; use md5::{Digest as Md5Digest, Md5};
@ -89,10 +88,8 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists // Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string(); let key = key.to_string();
let body_stream = BodyStream::new(req.into_body()) let stream = body_stream(req.into_body());
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap let mut chunker = StreamChunker::new(stream, garage.config.block_size);
.map_err(Error::from);
let mut chunker = StreamChunker::new(body_stream, garage.config.block_size);
let ((_, _, mut mpu), first_block) = futures::try_join!( let ((_, _, mut mpu), first_block) = futures::try_join!(
get_upload(&garage, &bucket_id, &key, &upload_id), get_upload(&garage, &bucket_id, &key, &upload_id),

View file

@ -7,8 +7,7 @@ use std::task::{Context, Poll};
use base64::prelude::*; use base64::prelude::*;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Duration, Utc}; use chrono::{DateTime, Duration, Utc};
use futures::{Stream, StreamExt, TryStreamExt}; use futures::{Stream, StreamExt};
use http_body_util::BodyStream;
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue}; use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use multer::{Constraints, Multipart, SizeLimit}; use multer::{Constraints, Multipart, SizeLimit};
@ -45,10 +44,8 @@ pub async fn handle_post_object(
); );
let (head, body) = req.into_parts(); let (head, body) = req.into_parts();
let body_stream = BodyStream::new(body) let stream = body_stream::<_, Error>(body);
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap let mut multipart = Multipart::with_constraints(stream, boundary, constraints);
.map_err(Error::from);
let mut multipart = Multipart::with_constraints(body_stream, boundary, constraints);
let mut params = HeaderMap::new(); let mut params = HeaderMap::new();
let field = loop { let field = loop {

View file

@ -4,13 +4,13 @@ use std::sync::Arc;
use base64::prelude::*; use base64::prelude::*;
use futures::prelude::*; use futures::prelude::*;
use futures::try_join; use futures::try_join;
use http_body_util::BodyStream;
use hyper::body::Bytes;
use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256; use sha2::Sha256;
use hyper::body::{Body, Bytes};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response};
use opentelemetry::{ use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context, Context,
@ -51,14 +51,12 @@ pub async fn handle_put(
None => None, None => None,
}; };
let body_stream = BodyStream::new(req.into_body()) let stream = body_stream(req.into_body());
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
.map_err(Error::from);
save_stream( save_stream(
garage, garage,
headers, headers,
body_stream, stream,
bucket, bucket,
key, key,
content_md5, content_md5,

View file

@ -5,7 +5,7 @@ use futures::prelude::*;
use futures::task; use futures::task;
use garage_model::key_table::Key; use garage_model::key_table::Key;
use hmac::Mac; use hmac::Mac;
use http_body_util::{BodyStream, StreamBody}; use http_body_util::StreamBody;
use hyper::body::{Bytes, Incoming as IncomingBody}; use hyper::body::{Bytes, Incoming as IncomingBody};
use hyper::Request; use hyper::Request;
@ -51,11 +51,9 @@ pub fn parse_streaming_body(
.ok_or_internal_error("Unable to build signing HMAC")?; .ok_or_internal_error("Unable to build signing HMAC")?;
Ok(req.map(move |body| { Ok(req.map(move |body| {
let body_stream = BodyStream::new(body) let stream = body_stream::<_, Error>(body);
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
.map_err(Error::from);
let signed_payload_stream = let signed_payload_stream =
SignedPayloadStream::new(body_stream, signing_hmac, date, &scope, signature) SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature)
.map(|x| x.map(hyper::body::Frame::data)) .map(|x| x.map(hyper::body::Frame::data))
.map_err(Error::from); .map_err(Error::from);
ReqBody::new(StreamBody::new(signed_payload_stream)) ReqBody::new(StreamBody::new(signed_payload_stream))