diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 71f0b1585..f70dad7da 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -11,7 +11,8 @@ use http::header::{ use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; -use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; +use garage_block::manager::BlockStream; +use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; use garage_util::error::OkOrMessage; @@ -245,7 +246,7 @@ pub async fn handle_get( Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let (tx, rx) = mpsc::channel(2); + let (tx, rx) = mpsc::channel::(2); let order_stream = OrderTag::stream(); let first_block_hash = *first_block_hash; @@ -283,25 +284,13 @@ pub async fn handle_get( { Ok(()) => (), Err(e) => { - let err = std::io::Error::new( - std::io::ErrorKind::Other, - format!("Error while getting object data: {}", e), - ); - let _ = tx - .send(Box::pin(stream::once(future::ready(Err(err))))) - .await; + let _ = tx.send(error_stream_item(e)).await; } } }); - let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) - .flatten() - .map(|x| { - x.map(hyper::body::Frame::data) - .map_err(|e| Error::from(garage_util::error::Error::from(e))) - }); - let body = http_body_util::StreamBody::new(body_stream); - Ok(resp_builder.body(ResBody::new(body))?) + let body = response_body_from_block_stream(rx); + Ok(resp_builder.body(body)?) } } } @@ -461,67 +450,75 @@ fn body_from_blocks_range( } let order_stream = OrderTag::stream(); - let mut body_stream = - futures::stream::iter(blocks) - .enumerate() - .map(move |(i, (block, block_offset))| { - let garage = garage.clone(); - async move { - garage - .block_manager - .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) - .await - .unwrap_or_else(|e| error_stream(i, e)) - .scan(block_offset, move |chunk_offset, chunk| { - let r = match chunk { - Ok(chunk_bytes) => { - let chunk_len = chunk_bytes.len() as u64; - let r = if *chunk_offset >= end { - // The current chunk is after the part we want to read. - // Returning None here will stop the scan, the rest of the - // stream will be ignored - None - } else if *chunk_offset + chunk_len <= begin { - // The current chunk is before the part we want to read. - // We return a None that will be removed by the filter_map - // below. - Some(None) - } else { - // The chunk has an intersection with the requested range - let start_in_chunk = if *chunk_offset > begin { - 0 - } else { - begin - *chunk_offset - }; - let end_in_chunk = if *chunk_offset + chunk_len < end { - chunk_len - } else { - end - *chunk_offset - }; - Some(Some(Ok(chunk_bytes.slice( - start_in_chunk as usize..end_in_chunk as usize, - )))) - }; - *chunk_offset += chunk_bytes.len() as u64; - r - } - Err(e) => Some(Some(Err(e))), - }; - futures::future::ready(r) - }) - .filter_map(futures::future::ready) - } - }); + let (tx, rx) = mpsc::channel::(2); - let (tx, rx) = mpsc::channel(2); tokio::spawn(async move { - while let Some(item) = body_stream.next().await { - if tx.send(item.await).await.is_err() { - break; // connection closed by client + match async { + let garage = garage.clone(); + for (i, (block, block_offset)) in blocks.iter().enumerate() { + let block_stream = garage + .block_manager + .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) + .await? + .scan(*block_offset, move |chunk_offset, chunk| { + let r = match chunk { + Ok(chunk_bytes) => { + let chunk_len = chunk_bytes.len() as u64; + let r = if *chunk_offset >= end { + // The current chunk is after the part we want to read. + // Returning None here will stop the scan, the rest of the + // stream will be ignored + None + } else if *chunk_offset + chunk_len <= begin { + // The current chunk is before the part we want to read. + // We return a None that will be removed by the filter_map + // below. + Some(None) + } else { + // The chunk has an intersection with the requested range + let start_in_chunk = if *chunk_offset > begin { + 0 + } else { + begin - *chunk_offset + }; + let end_in_chunk = if *chunk_offset + chunk_len < end { + chunk_len + } else { + end - *chunk_offset + }; + Some(Some(Ok(chunk_bytes + .slice(start_in_chunk as usize..end_in_chunk as usize)))) + }; + *chunk_offset += chunk_bytes.len() as u64; + r + } + Err(e) => Some(Some(Err(e))), + }; + futures::future::ready(r) + }) + .filter_map(futures::future::ready); + + let block_stream: BlockStream = Box::pin(block_stream); + tx.send(Box::pin(block_stream)) + .await + .ok_or_message("channel closed")?; + } + + Ok::<(), Error>(()) + } + .await + { + Ok(()) => (), + Err(e) => { + let _ = tx.send(error_stream_item(e)).await; } } }); + response_body_from_block_stream(rx) +} + +fn response_body_from_block_stream(rx: mpsc::Receiver) -> ResBody { let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) .flatten() .map(|x| { @@ -531,11 +528,10 @@ fn body_from_blocks_range( ResBody::new(http_body_util::StreamBody::new(body_stream)) } -fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { - Box::pin(futures::stream::once(async move { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Could not get block {}: {}", i, e), - )) - })) +fn error_stream_item(e: E) -> BlockStream { + let err = std::io::Error::new( + std::io::ErrorKind::Other, + format!("Error while getting object data: {}", e), + ); + Box::pin(stream::once(future::ready(Err(err)))) } diff --git a/src/block/manager.rs b/src/block/manager.rs index 2d1b5c678..5388f69df 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -53,6 +53,9 @@ pub const INLINE_THRESHOLD: usize = 3072; // to delete the block locally. pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); +pub type BlockStream = + Pin> + Send + Sync + 'static>>; + /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum BlockRpc { @@ -324,10 +327,7 @@ impl BlockManager { &self, hash: &Hash, order_tag: Option, - ) -> Result< - Pin> + Send + Sync + 'static>>, - Error, - > { + ) -> Result { let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { DataBlockHeader::Plain => Ok(stream),