use netapp streaming body #343
2 changed files with 7 additions and 9 deletions
|
@ -9,6 +9,7 @@ use bytes::Bytes;
|
||||||
use hyper::{Body, Request, Response};
|
use hyper::{Body, Request, Response};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
|
use garage_rpc::netapp::bytes_buf::BytesBuf;
|
||||||
use garage_rpc::rpc_helper::OrderTag;
|
use garage_rpc::rpc_helper::OrderTag;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
@ -566,7 +567,7 @@ type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
|
||||||
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
|
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
|
||||||
block_size: usize,
|
block_size: usize,
|
||||||
block_stream: Pin<Box<stream::Peekable<S>>>,
|
block_stream: Pin<Box<stream::Peekable<S>>>,
|
||||||
buffer: Vec<u8>,
|
buffer: BytesBuf,
|
||||||
hash: Option<Hash>,
|
hash: Option<Hash>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -575,7 +576,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
|
||||||
Self {
|
Self {
|
||||||
block_size,
|
block_size,
|
||||||
block_stream,
|
block_stream,
|
||||||
buffer: vec![],
|
buffer: BytesBuf::new(),
|
||||||
hash: None,
|
hash: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -593,7 +594,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
|
||||||
|
|
||||||
if self.buffer.is_empty() {
|
if self.buffer.is_empty() {
|
||||||
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
|
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
|
||||||
self.buffer = next_block.to_vec(); // TODO TOO MUCH COPY
|
self.buffer.extend(next_block);
|
||||||
self.hash = next_block_hash;
|
self.hash = next_block_hash;
|
||||||
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
|
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
|
||||||
break;
|
break;
|
||||||
|
@ -604,10 +605,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((
|
Ok((self.buffer.take_all(), self.hash.take()))
|
||||||
Bytes::from(std::mem::take(&mut self.buffer)),
|
|
||||||
self.hash.take(),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -274,11 +274,11 @@ pub async fn handle_get(
|
||||||
.block_manager
|
.block_manager
|
||||||
.rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
|
.rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
|
||||||
.await
|
.await
|
||||||
.unwrap_or_else(|_| {
|
.unwrap_or_else(|e| {
|
||||||
Box::pin(futures::stream::once(async move {
|
Box::pin(futures::stream::once(async move {
|
||||||
Err(std::io::Error::new(
|
Err(std::io::Error::new(
|
||||||
std::io::ErrorKind::Other,
|
std::io::ErrorKind::Other,
|
||||||
"Could not get next block",
|
format!("Could not get block {}: {}", i, e),
|
||||||
))
|
))
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue