use netapp streaming body #343

Merged
lx merged 31 commits from netapp-stream-body into main 2022-09-13 13:26:09 +00:00
Showing only changes of commit ff30891999 - Show all commits

View file

@ -7,10 +7,9 @@ use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED, RANGE, IF_NONE_MATCH, LAST_MODIFIED, RANGE,
}; };
use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
use garage_rpc::rpc_helper::OrderTag; use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
use garage_table::EmptyKey; use garage_table::EmptyKey;
use garage_util::data::*; use garage_util::data::*;
@ -274,14 +273,7 @@ pub async fn handle_get(
.block_manager .block_manager
.rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
.await .await
.unwrap_or_else(|e| { .unwrap_or_else(|e| error_stream(i, e))
Box::pin(futures::stream::once(async move {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Could not get block {}: {}", i, e),
))
}))
})
} }
} }
}) })
@ -437,44 +429,79 @@ fn body_from_blocks_range(
all_blocks.len(), all_blocks.len(),
4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize, 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize,
)); ));
let mut true_offset = 0; let mut block_offset: u64 = 0;
for (_, b) in all_blocks.iter() { for (_, b) in all_blocks.iter() {
if true_offset >= end { if block_offset >= end {
break; break;
} }
// Keep only blocks that have an intersection with the requested range // Keep only blocks that have an intersection with the requested range
if true_offset < end && true_offset + b.size > begin { if block_offset < end && block_offset + b.size > begin {
blocks.push((*b, true_offset)); blocks.push((*b, block_offset));
} }
true_offset += b.size; block_offset += b.size as u64;
} }
let order_stream = OrderTag::stream(); let order_stream = OrderTag::stream();
let body_stream = futures::stream::iter(blocks) let body_stream = futures::stream::iter(blocks)
.enumerate() .enumerate()
.map(move |(i, (block, true_offset))| { .map(move |(i, (block, block_offset))| {
let garage = garage.clone(); let garage = garage.clone();
async move { async move {
let data = garage garage
.block_manager .block_manager
.rpc_get_block(&block.hash, Some(order_stream.order(i as u64))) .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
.await?; .await
let start_in_block = if true_offset > begin { .unwrap_or_else(|e| error_stream(i, e))
.scan(block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {
let chunk_len = chunk_bytes.len() as u64;
let r = if *chunk_offset >= end {
// The current chunk is after the part we want to read.
// Returning None here will stop the scan, the rest of the
// stream will be ignored
None
} else if *chunk_offset + chunk_len <= begin {
// The current chunk is before the part we want to read.
// We return a None that will be removed by the filter_map
// below.
Some(None)
} else {
// The chunk has an intersection with the requested range
let start_in_chunk = if *chunk_offset > begin {
0 0
} else { } else {
begin - true_offset begin - *chunk_offset
}; };
let end_in_block = if true_offset + block.size < end { let end_in_chunk = if *chunk_offset + chunk_len < end {
block.size chunk_len
} else { } else {
end - true_offset end - *chunk_offset
}; };
Result::<Bytes, Error>::Ok( Some(Some(Ok(chunk_bytes
data.slice(start_in_block as usize..end_in_block as usize), .slice(start_in_chunk as usize..end_in_chunk as usize))))
) };
*chunk_offset += chunk_bytes.len() as u64;
r
}
Err(e) => Some(Some(Err(e))),
};
futures::future::ready(r)
})
.filter_map(futures::future::ready)
} }
}) })
.buffered(2); .buffered(2)
.flatten();
hyper::body::Body::wrap_stream(body_stream) hyper::body::Body::wrap_stream(body_stream)
} }
fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream {
Box::pin(futures::stream::once(async move {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Could not get block {}: {}", i, e),
))
}))
}