use netapp streaming body #343

Merged
lx merged 31 commits from netapp-stream-body into main 2022-09-13 13:26:09 +00:00
5 changed files with 64 additions and 26 deletions
Showing only changes of commit bc977f9a7a - Show all commits

2
Cargo.lock generated
View file

@ -2176,7 +2176,7 @@ dependencies = [
[[package]] [[package]]
name = "netapp" name = "netapp"
version = "0.5.0" version = "0.5.0"
source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#3fd30c6e280fba41377c8b563352d756e8bc1caf" source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#4a59b73d7bfd0f136f654e874afb5d2a9bf4df2e"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",

View file

@ -9,6 +9,7 @@ use bytes::Bytes;
use hyper::{Body, Request, Response}; use hyper::{Body, Request, Response};
use serde::Serialize; use serde::Serialize;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::*; use garage_table::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::time::*; use garage_util::time::*;
@ -306,11 +307,16 @@ pub async fn handle_upload_part_copy(
// if and only if the block returned is a block that already existed // if and only if the block returned is a block that already existed
// in the Garage data store (thus we don't need to save it again). // in the Garage data store (thus we don't need to save it again).
let garage2 = garage.clone(); let garage2 = garage.clone();
let order_stream = OrderTag::stream();
let source_blocks = stream::iter(blocks_to_copy) let source_blocks = stream::iter(blocks_to_copy)
.flat_map(|(block_hash, range_to_copy)| { .enumerate()
.flat_map(|(i, (block_hash, range_to_copy))| {
let garage3 = garage2.clone(); let garage3 = garage2.clone();
stream::once(async move { stream::once(async move {
let data = garage3.block_manager.rpc_get_block(&block_hash).await?; let data = garage3
.block_manager
.rpc_get_block(&block_hash, Some(order_stream.order(i as u64)))
.await?;
match range_to_copy { match range_to_copy {
Some(r) => Ok((data.slice(r), None)), Some(r) => Ok((data.slice(r), None)),
None => Ok((data, Some(block_hash))), None => Ok((data, Some(block_hash))),

View file

@ -10,6 +10,7 @@ use http::header::{
use hyper::body::Bytes; use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey; use garage_table::EmptyKey;
use garage_util::data::*; use garage_util::data::*;
@ -242,9 +243,11 @@ pub async fn handle_get(
Ok(resp_builder.body(body)?) Ok(resp_builder.body(body)?)
} }
ObjectVersionData::FirstBlock(_, first_block_hash) => { ObjectVersionData::FirstBlock(_, first_block_hash) => {
let order_stream = OrderTag::stream();
let read_first_block = garage let read_first_block = garage
.block_manager .block_manager
.rpc_get_block_streaming(first_block_hash); .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0)));
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
let (first_block_stream, version) = let (first_block_stream, version) =
@ -260,7 +263,8 @@ pub async fn handle_get(
blocks[0].1 = Some(first_block_stream); blocks[0].1 = Some(first_block_stream);
let body_stream = futures::stream::iter(blocks) let body_stream = futures::stream::iter(blocks)
.map(move |(hash, stream_opt)| { .enumerate()
.map(move |(i, (hash, stream_opt))| {
let garage = garage.clone(); let garage = garage.clone();
async move { async move {
if let Some(stream) = stream_opt { if let Some(stream) = stream_opt {
@ -268,7 +272,7 @@ pub async fn handle_get(
} else { } else {
garage garage
.block_manager .block_manager
.rpc_get_block_streaming(&hash) .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
.await .await
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
Box::pin(futures::stream::once(async move { Box::pin(futures::stream::once(async move {
@ -281,7 +285,7 @@ pub async fn handle_get(
} }
} }
}) })
.buffered(3) .buffered(2)
.flatten(); .flatten();
let body = hyper::body::Body::wrap_stream(body_stream); let body = hyper::body::Body::wrap_stream(body_stream);
@ -445,11 +449,16 @@ fn body_from_blocks_range(
true_offset += b.size; true_offset += b.size;
} }
let order_stream = OrderTag::stream();
let body_stream = futures::stream::iter(blocks) let body_stream = futures::stream::iter(blocks)
.map(move |(block, true_offset)| { .enumerate()
.map(move |(i, (block, true_offset))| {
let garage = garage.clone(); let garage = garage.clone();
async move { async move {
let data = garage.block_manager.rpc_get_block(&block.hash).await?; let data = garage
.block_manager
.rpc_get_block(&block.hash, Some(order_stream.order(i as u64)))
.await?;
let start_in_block = if true_offset > begin { let start_in_block = if true_offset > begin {
0 0
} else { } else {

View file

@ -33,6 +33,7 @@ use garage_util::metrics::RecordDuration;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System; use garage_rpc::system::System;
use garage_rpc::*; use garage_rpc::*;
@ -70,7 +71,7 @@ pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
pub enum BlockRpc { pub enum BlockRpc {
Ok, Ok,
/// Message to ask for a block of data, by hash /// Message to ask for a block of data, by hash
GetBlock(Hash), GetBlock(Hash, Option<OrderTag>),
/// Message to send a block of data, either because requested, of for first delivery of new /// Message to send a block of data, either because requested, of for first delivery of new
/// block /// block
PutBlock { PutBlock {
@ -183,15 +184,18 @@ impl BlockManager {
async fn rpc_get_raw_block_streaming( async fn rpc_get_raw_block_streaming(
&self, &self,
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<(DataBlockHeader, ByteStream), Error> { ) -> Result<(DataBlockHeader, ByteStream), Error> {
let who = self.replication.read_nodes(hash); let who = self.replication.read_nodes(hash);
//let who = self.system.rpc.request_order(&who); //let who = self.system.rpc.request_order(&who);
for node in who.iter() { for node in who.iter() {
let node_id = NodeID::from(*node); let node_id = NodeID::from(*node);
let rpc = let rpc = self.endpoint.call_streaming(
self.endpoint &node_id,
.call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); BlockRpc::GetBlock(*hash, order_tag),
PRIO_NORMAL,
);
tokio::select! { tokio::select! {
res = rpc => { res = rpc => {
let res = match res { let res = match res {
@ -224,15 +228,21 @@ impl BlockManager {
/// Ask nodes that might have a (possibly compressed) block for it /// Ask nodes that might have a (possibly compressed) block for it
/// Return its entire body /// Return its entire body
async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> { async fn rpc_get_raw_block(
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
let who = self.replication.read_nodes(hash); let who = self.replication.read_nodes(hash);
//let who = self.system.rpc.request_order(&who); //let who = self.system.rpc.request_order(&who);
for node in who.iter() { for node in who.iter() {
let node_id = NodeID::from(*node); let node_id = NodeID::from(*node);
let rpc = let rpc = self.endpoint.call_streaming(
self.endpoint &node_id,
.call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); BlockRpc::GetBlock(*hash, order_tag),
PRIO_NORMAL,
);
tokio::select! { tokio::select! {
res = rpc => { res = rpc => {
let res = match res { let res = match res {
@ -275,11 +285,12 @@ impl BlockManager {
pub async fn rpc_get_block_streaming( pub async fn rpc_get_block_streaming(
&self, &self,
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result< ) -> Result<
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>, Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>,
Error, Error,
> { > {
let (header, stream) = self.rpc_get_raw_block_streaming(hash).await?; let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
match header { match header {
DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| { DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error") std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error")
@ -295,8 +306,14 @@ impl BlockManager {
} }
/// Ask nodes that might have a block for it /// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Bytes, Error> { pub async fn rpc_get_block(
self.rpc_get_raw_block(hash).await?.verify_get(*hash) &self,
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<Bytes, Error> {
self.rpc_get_raw_block(hash, order_tag)
.await?
.verify_get(*hash)
} }
/// Send block to nodes that should have it /// Send block to nodes that should have it
@ -441,7 +458,7 @@ impl BlockManager {
Ok(()) Ok(())
} }
async fn handle_get_block(&self, hash: &Hash) -> Resp<BlockRpc> { async fn handle_get_block(&self, hash: &Hash, order_tag: Option<OrderTag>) -> Resp<BlockRpc> {
let block = match self.read_block(hash).await { let block = match self.read_block(hash).await {
Ok(data) => data, Ok(data) => data,
Err(e) => return Resp::new(Err(e)), Err(e) => return Resp::new(Err(e)),
@ -449,11 +466,17 @@ impl BlockManager {
let (header, data) = block.into_parts(); let (header, data) = block.into_parts();
Resp::new(Ok(BlockRpc::PutBlock { let resp = Resp::new(Ok(BlockRpc::PutBlock {
hash: *hash, hash: *hash,
header, header,
})) }))
.with_stream_from_buffer(data) .with_stream_from_buffer(data);
if let Some(order_tag) = order_tag {
resp.with_order_tag(order_tag)
} else {
resp
}
} }
/// Read block from disk, verifying it's integrity /// Read block from disk, verifying it's integrity
@ -841,7 +864,7 @@ impl BlockManager {
hash hash
); );
let block_data = self.rpc_get_raw_block(hash).await?; let block_data = self.rpc_get_raw_block(hash, None).await?;
self.metrics.resync_recv_counter.add(1); self.metrics.resync_recv_counter.add(1);
@ -861,7 +884,7 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
.await .await
.map(|_| BlockRpc::Ok), .map(|_| BlockRpc::Ok),
), ),
BlockRpc::GetBlock(h) => self.handle_get_block(h).await, BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await,
BlockRpc::NeedBlockQuery(h) => { BlockRpc::NeedBlockQuery(h) => {
Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply)) Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply))
} }

View file

@ -18,7 +18,7 @@ use opentelemetry::{
pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
use netapp::message::IntoReq; use netapp::message::IntoReq;
pub use netapp::message::{ pub use netapp::message::{
Message as Rpc, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
}; };
use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::{self, NetApp, NodeID}; pub use netapp::{self, NetApp, NodeID};