some refactoring on data read/write path #729
2 changed files with 7 additions and 12 deletions
|
@ -13,7 +13,7 @@ use http::header::{
|
||||||
use hyper::{body::Body, Request, Response, StatusCode};
|
use hyper::{body::Body, Request, Response, StatusCode};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use garage_block::manager::BlockStream;
|
use garage_net::stream::ByteStream;
|
||||||
use garage_rpc::rpc_helper::OrderTag;
|
use garage_rpc::rpc_helper::OrderTag;
|
||||||
use garage_table::EmptyKey;
|
use garage_table::EmptyKey;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
@ -286,7 +286,7 @@ pub async fn handle_get(
|
||||||
Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
|
Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
|
||||||
}
|
}
|
||||||
ObjectVersionData::FirstBlock(_, first_block_hash) => {
|
ObjectVersionData::FirstBlock(_, first_block_hash) => {
|
||||||
let (tx, rx) = mpsc::channel::<BlockStream>(2);
|
let (tx, rx) = mpsc::channel::<ByteStream>(2);
|
||||||
|
|
||||||
let order_stream = OrderTag::stream();
|
let order_stream = OrderTag::stream();
|
||||||
let first_block_hash = *first_block_hash;
|
let first_block_hash = *first_block_hash;
|
||||||
|
@ -494,7 +494,7 @@ fn body_from_blocks_range(
|
||||||
}
|
}
|
||||||
|
|
||||||
let order_stream = OrderTag::stream();
|
let order_stream = OrderTag::stream();
|
||||||
let (tx, rx) = mpsc::channel::<BlockStream>(2);
|
let (tx, rx) = mpsc::channel::<ByteStream>(2);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
match async {
|
match async {
|
||||||
|
@ -542,7 +542,7 @@ fn body_from_blocks_range(
|
||||||
})
|
})
|
||||||
.filter_map(futures::future::ready);
|
.filter_map(futures::future::ready);
|
||||||
|
|
||||||
let block_stream: BlockStream = Box::pin(block_stream);
|
let block_stream: ByteStream = Box::pin(block_stream);
|
||||||
tx.send(Box::pin(block_stream))
|
tx.send(Box::pin(block_stream))
|
||||||
.await
|
.await
|
||||||
.ok_or_message("channel closed")?;
|
.ok_or_message("channel closed")?;
|
||||||
|
@ -562,7 +562,7 @@ fn body_from_blocks_range(
|
||||||
response_body_from_block_stream(rx)
|
response_body_from_block_stream(rx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
|
fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody {
|
||||||
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
|
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
|
||||||
.flatten()
|
.flatten()
|
||||||
.map(|x| {
|
.map(|x| {
|
||||||
|
@ -572,7 +572,7 @@ fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
|
||||||
ResBody::new(http_body_util::StreamBody::new(body_stream))
|
ResBody::new(http_body_util::StreamBody::new(body_stream))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream {
|
fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream {
|
||||||
let err = std::io::Error::new(
|
let err = std::io::Error::new(
|
||||||
std::io::ErrorKind::Other,
|
std::io::ErrorKind::Other,
|
||||||
format!("Error while getting object data: {}", e),
|
format!("Error while getting object data: {}", e),
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::pin::Pin;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -9,7 +8,6 @@ use bytes::Bytes;
|
||||||
use rand::prelude::*;
|
use rand::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use futures::Stream;
|
|
||||||
use futures_util::stream::StreamExt;
|
use futures_util::stream::StreamExt;
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||||
|
@ -53,9 +51,6 @@ pub const INLINE_THRESHOLD: usize = 3072;
|
||||||
// to delete the block locally.
|
// to delete the block locally.
|
||||||
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
|
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
|
||||||
|
|
||||||
pub type BlockStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>;
|
|
||||||
|
|
||||||
/// RPC messages used to share blocks of data between nodes
|
/// RPC messages used to share blocks of data between nodes
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum BlockRpc {
|
pub enum BlockRpc {
|
||||||
|
@ -327,7 +322,7 @@ impl BlockManager {
|
||||||
&self,
|
&self,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<BlockStream, Error> {
|
) -> Result<ByteStream, Error> {
|
||||||
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
|
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
|
||||||
match header {
|
match header {
|
||||||
DataBlockHeader::Plain => Ok(stream),
|
DataBlockHeader::Plain => Ok(stream),
|
||||||
|
|
Loading…
Reference in a new issue