use netapp streaming body #343
1 changed files with 4 additions and 6 deletions
|
@ -9,7 +9,7 @@ use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use futures::{Stream, TryStreamExt};
|
use futures::Stream;
|
||||||
use futures_util::stream::StreamExt;
|
use futures_util::stream::StreamExt;
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||||
|
@ -191,7 +191,7 @@ impl BlockManager {
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<(DataBlockHeader, ByteStream), Error> {
|
) -> Result<(DataBlockHeader, ByteStream), Error> {
|
||||||
let who = self.replication.read_nodes(hash);
|
let who = self.replication.read_nodes(hash);
|
||||||
//let who = self.system.rpc.request_order(&who);
|
let who = self.system.rpc.request_order(&who);
|
||||||
|
|
||||||
for node in who.iter() {
|
for node in who.iter() {
|
||||||
let node_id = NodeID::from(*node);
|
let node_id = NodeID::from(*node);
|
||||||
|
@ -238,7 +238,7 @@ impl BlockManager {
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<DataBlock, Error> {
|
) -> Result<DataBlock, Error> {
|
||||||
let who = self.replication.read_nodes(hash);
|
let who = self.replication.read_nodes(hash);
|
||||||
//let who = self.system.rpc.request_order(&who);
|
let who = self.system.rpc.request_order(&who);
|
||||||
|
|
||||||
for node in who.iter() {
|
for node in who.iter() {
|
||||||
let node_id = NodeID::from(*node);
|
let node_id = NodeID::from(*node);
|
||||||
|
@ -296,9 +296,7 @@ impl BlockManager {
|
||||||
> {
|
> {
|
||||||
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
|
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
|
||||||
match header {
|
match header {
|
||||||
DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| {
|
DataBlockHeader::Plain => Ok(Box::pin(stream)),
|
||||||
std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error")
|
|
||||||
}))),
|
|
||||||
DataBlockHeader::Compressed => {
|
DataBlockHeader::Compressed => {
|
||||||
// Too many things, I hate it.
|
// Too many things, I hate it.
|
||||||
let reader = stream_asyncread(stream);
|
let reader = stream_asyncread(stream);
|
||||||
|
|
Loading…
Reference in a new issue