garage_api: Refactor BodyChunker for stream-composition

This commit is contained in:
Jill 2021-11-17 18:13:34 +01:00
parent f7349f4005
commit 732b4d0b63
Signed by: KokaKiwi
GPG key ID: 09A5A2688F13FAC1

View file

@ -1,8 +1,9 @@
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
use futures::stream::*;
use hyper::{Body, Request, Response};
use futures::prelude::*;
use hyper::body::{Body, Bytes};
use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
@ -44,7 +45,7 @@ pub async fn handle_put(
// Parse body of uploaded file
let body = req.into_body();
let mut chunker = BodyChunker::new(body, garage.config.block_size);
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let first_block = chunker.next().await?.unwrap_or_default();
// If body is small enough, store it directly in the object table
@ -178,13 +179,13 @@ fn ensure_checksum_matches(
Ok(())
}
async fn read_and_put_blocks(
async fn read_and_put_blocks<E: Into<GarageError>, S: Stream<Item = Result<Bytes, E>> + Unpin>(
garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
chunker: &mut BodyChunker,
chunker: &mut StreamChunker<S, E>,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
@ -205,8 +206,11 @@ async fn read_and_put_blocks(
.rpc_put_block(first_block_hash, first_block);
loop {
let (_, _, next_block) =
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
let (_, _, next_block) = futures::try_join!(
put_curr_block,
put_curr_version_block,
chunker.next().map_err(Into::into),
)?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
sha256hasher.update(&block[..]);
@ -266,25 +270,26 @@ async fn put_block_meta(
Ok(())
}
struct BodyChunker {
body: Body,
struct StreamChunker<S: Stream<Item = Result<Bytes, E>>, E> {
stream: S,
read_all: bool,
block_size: usize,
buf: VecDeque<u8>,
}
impl BodyChunker {
fn new(body: Body, block_size: usize) -> Self {
impl<S: Stream<Item = Result<Bytes, E>> + Unpin, E> StreamChunker<S, E> {
fn new(stream: S, block_size: usize) -> Self {
Self {
body,
stream,
read_all: false,
block_size,
buf: VecDeque::with_capacity(2 * block_size),
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
async fn next(&mut self) -> Result<Option<Vec<u8>>, E> {
while !self.read_all && self.buf.len() < self.block_size {
if let Some(block) = self.body.next().await {
if let Some(block) = self.stream.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
self.buf.extend(&bytes[..]);
@ -292,6 +297,7 @@ impl BodyChunker {
self.read_all = true;
}
}
if self.buf.is_empty() {
Ok(None)
} else if self.buf.len() <= self.block_size {
@ -368,12 +374,12 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
let mut chunker = StreamChunker::new(req.into_body(), garage.config.block_size);
let (object, version, first_block) = futures::try_join!(
garage.object_table.get(&bucket_id, &key),
garage.version_table.get(&version_uuid, &EmptyKey),
chunker.next()
chunker.next().map_err(GarageError::from),
)?;
// Check object is valid and multipart block can be accepted