[dep-upgrade-202402] simplify/refactor GetObject

This commit is contained in:
Alex 2024-02-05 20:26:33 +01:00
parent 81ccd4586e
commit 22332e6c35
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
2 changed files with 79 additions and 83 deletions

View file

@ -11,7 +11,8 @@ use http::header::{
use hyper::{body::Body, Request, Response, StatusCode}; use hyper::{body::Body, Request, Response, StatusCode};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; use garage_block::manager::BlockStream;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey; use garage_table::EmptyKey;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::OkOrMessage; use garage_util::error::OkOrMessage;
@ -245,7 +246,7 @@ pub async fn handle_get(
Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
} }
ObjectVersionData::FirstBlock(_, first_block_hash) => { ObjectVersionData::FirstBlock(_, first_block_hash) => {
let (tx, rx) = mpsc::channel(2); let (tx, rx) = mpsc::channel::<BlockStream>(2);
let order_stream = OrderTag::stream(); let order_stream = OrderTag::stream();
let first_block_hash = *first_block_hash; let first_block_hash = *first_block_hash;
@ -283,25 +284,13 @@ pub async fn handle_get(
{ {
Ok(()) => (), Ok(()) => (),
Err(e) => { Err(e) => {
let err = std::io::Error::new( let _ = tx.send(error_stream_item(e)).await;
std::io::ErrorKind::Other,
format!("Error while getting object data: {}", e),
);
let _ = tx
.send(Box::pin(stream::once(future::ready(Err(err)))))
.await;
} }
} }
}); });
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) let body = response_body_from_block_stream(rx);
.flatten() Ok(resp_builder.body(body)?)
.map(|x| {
x.map(hyper::body::Frame::data)
.map_err(|e| Error::from(garage_util::error::Error::from(e)))
});
let body = http_body_util::StreamBody::new(body_stream);
Ok(resp_builder.body(ResBody::new(body))?)
} }
} }
} }
@ -461,67 +450,75 @@ fn body_from_blocks_range(
} }
let order_stream = OrderTag::stream(); let order_stream = OrderTag::stream();
let mut body_stream = let (tx, rx) = mpsc::channel::<BlockStream>(2);
futures::stream::iter(blocks)
.enumerate()
.map(move |(i, (block, block_offset))| {
let garage = garage.clone();
async move {
garage
.block_manager
.rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
.await
.unwrap_or_else(|e| error_stream(i, e))
.scan(block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {
let chunk_len = chunk_bytes.len() as u64;
let r = if *chunk_offset >= end {
// The current chunk is after the part we want to read.
// Returning None here will stop the scan, the rest of the
// stream will be ignored
None
} else if *chunk_offset + chunk_len <= begin {
// The current chunk is before the part we want to read.
// We return a None that will be removed by the filter_map
// below.
Some(None)
} else {
// The chunk has an intersection with the requested range
let start_in_chunk = if *chunk_offset > begin {
0
} else {
begin - *chunk_offset
};
let end_in_chunk = if *chunk_offset + chunk_len < end {
chunk_len
} else {
end - *chunk_offset
};
Some(Some(Ok(chunk_bytes.slice(
start_in_chunk as usize..end_in_chunk as usize,
))))
};
*chunk_offset += chunk_bytes.len() as u64;
r
}
Err(e) => Some(Some(Err(e))),
};
futures::future::ready(r)
})
.filter_map(futures::future::ready)
}
});
let (tx, rx) = mpsc::channel(2);
tokio::spawn(async move { tokio::spawn(async move {
while let Some(item) = body_stream.next().await { match async {
if tx.send(item.await).await.is_err() { let garage = garage.clone();
break; // connection closed by client for (i, (block, block_offset)) in blocks.iter().enumerate() {
let block_stream = garage
.block_manager
.rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
.await?
.scan(*block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {
let chunk_len = chunk_bytes.len() as u64;
let r = if *chunk_offset >= end {
// The current chunk is after the part we want to read.
// Returning None here will stop the scan, the rest of the
// stream will be ignored
None
} else if *chunk_offset + chunk_len <= begin {
// The current chunk is before the part we want to read.
// We return a None that will be removed by the filter_map
// below.
Some(None)
} else {
// The chunk has an intersection with the requested range
let start_in_chunk = if *chunk_offset > begin {
0
} else {
begin - *chunk_offset
};
let end_in_chunk = if *chunk_offset + chunk_len < end {
chunk_len
} else {
end - *chunk_offset
};
Some(Some(Ok(chunk_bytes
.slice(start_in_chunk as usize..end_in_chunk as usize))))
};
*chunk_offset += chunk_bytes.len() as u64;
r
}
Err(e) => Some(Some(Err(e))),
};
futures::future::ready(r)
})
.filter_map(futures::future::ready);
let block_stream: BlockStream = Box::pin(block_stream);
tx.send(Box::pin(block_stream))
.await
.ok_or_message("channel closed")?;
}
Ok::<(), Error>(())
}
.await
{
Ok(()) => (),
Err(e) => {
let _ = tx.send(error_stream_item(e)).await;
} }
} }
}); });
response_body_from_block_stream(rx)
}
fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
.flatten() .flatten()
.map(|x| { .map(|x| {
@ -531,11 +528,10 @@ fn body_from_blocks_range(
ResBody::new(http_body_util::StreamBody::new(body_stream)) ResBody::new(http_body_util::StreamBody::new(body_stream))
} }
fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream {
Box::pin(futures::stream::once(async move { let err = std::io::Error::new(
Err(std::io::Error::new( std::io::ErrorKind::Other,
std::io::ErrorKind::Other, format!("Error while getting object data: {}", e),
format!("Could not get block {}: {}", i, e), );
)) Box::pin(stream::once(future::ready(Err(err))))
}))
} }

View file

@ -53,6 +53,9 @@ pub const INLINE_THRESHOLD: usize = 3072;
// to delete the block locally. // to delete the block locally.
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
pub type BlockStream =
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>;
/// RPC messages used to share blocks of data between nodes /// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc { pub enum BlockRpc {
@ -324,10 +327,7 @@ impl BlockManager {
&self, &self,
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result< ) -> Result<BlockStream, Error> {
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>,
Error,
> {
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
match header { match header {
DataBlockHeader::Plain => Ok(stream), DataBlockHeader::Plain => Ok(stream),