From 732b4d0b63b58e7b09267b3165c7713ccec23ee5 Mon Sep 17 00:00:00 2001 From: Jill Date: Wed, 17 Nov 2021 18:13:34 +0100 Subject: [PATCH] garage_api: Refactor BodyChunker for stream-composition --- src/api/s3_put.rs | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 37658172..d4bd454f 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,8 +1,9 @@ use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; -use futures::stream::*; -use hyper::{Body, Request, Response}; +use futures::prelude::*; +use hyper::body::{Body, Bytes}; +use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -44,7 +45,7 @@ pub async fn handle_put( // Parse body of uploaded file let body = req.into_body(); - let mut chunker = BodyChunker::new(body, garage.config.block_size); + let mut chunker = StreamChunker::new(body, garage.config.block_size); let first_block = chunker.next().await?.unwrap_or_default(); // If body is small enough, store it directly in the object table @@ -178,13 +179,13 @@ fn ensure_checksum_matches( Ok(()) } -async fn read_and_put_blocks( +async fn read_and_put_blocks, S: Stream> + Unpin>( garage: &Garage, version: &Version, part_number: u64, first_block: Vec, first_block_hash: Hash, - chunker: &mut BodyChunker, + chunker: &mut StreamChunker, ) -> Result<(u64, GenericArray, Hash), Error> { let mut md5hasher = Md5::new(); let mut sha256hasher = Sha256::new(); @@ -205,8 +206,11 @@ async fn read_and_put_blocks( .rpc_put_block(first_block_hash, first_block); loop { - let (_, _, next_block) = - futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + let (_, _, next_block) = futures::try_join!( + put_curr_block, + put_curr_version_block, + chunker.next().map_err(Into::into), + )?; if let Some(block) = next_block { md5hasher.update(&block[..]); sha256hasher.update(&block[..]); @@ -266,25 +270,26 @@ async fn put_block_meta( Ok(()) } -struct BodyChunker { - body: Body, +struct StreamChunker>, E> { + stream: S, read_all: bool, block_size: usize, buf: VecDeque, } -impl BodyChunker { - fn new(body: Body, block_size: usize) -> Self { +impl> + Unpin, E> StreamChunker { + fn new(stream: S, block_size: usize) -> Self { Self { - body, + stream, read_all: false, block_size, buf: VecDeque::with_capacity(2 * block_size), } } - async fn next(&mut self) -> Result>, GarageError> { + + async fn next(&mut self) -> Result>, E> { while !self.read_all && self.buf.len() < self.block_size { - if let Some(block) = self.body.next().await { + if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); self.buf.extend(&bytes[..]); @@ -292,6 +297,7 @@ impl BodyChunker { self.read_all = true; } } + if self.buf.is_empty() { Ok(None) } else if self.buf.len() <= self.block_size { @@ -368,12 +374,12 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); + let mut chunker = StreamChunker::new(req.into_body(), garage.config.block_size); let (object, version, first_block) = futures::try_join!( garage.object_table.get(&bucket_id, &key), garage.version_table.get(&version_uuid, &EmptyKey), - chunker.next() + chunker.next().map_err(GarageError::from), )?; // Check object is valid and multipart block can be accepted