Use streaming in block manager

This commit is contained in:
Alex 2022-07-22 18:20:27 +02:00
parent a35d4da721
commit 605a630333
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
7 changed files with 284 additions and 88 deletions

18
Cargo.lock generated
View file

@ -50,6 +50,20 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "async-compression"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00461f243d703f6999c8e7494f077799f1362720a55ae49a90ffe6214032fc0b"
dependencies = [
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"zstd",
"zstd-safe",
]
[[package]] [[package]]
name = "async-stream" name = "async-stream"
version = "0.3.3" version = "0.3.3"
@ -1070,6 +1084,7 @@ name = "garage_block"
version = "0.7.0" version = "0.7.0"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-compression",
"async-trait", "async-trait",
"bytes 1.2.0", "bytes 1.2.0",
"futures", "futures",
@ -1085,6 +1100,7 @@ dependencies = [
"serde", "serde",
"serde_bytes", "serde_bytes",
"tokio", "tokio",
"tokio-util 0.6.9",
"tracing", "tracing",
"zstd", "zstd",
] ]
@ -1292,7 +1308,7 @@ version = "0.7.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"blake2", "blake2",
"bytes 1.1.0", "bytes 1.2.0",
"chrono", "chrono",
"digest 0.10.3", "digest 0.10.3",
"err-derive 0.3.1", "err-derive 0.3.1",

View file

@ -5,6 +5,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
use md5::{Digest as Md5Digest, Md5}; use md5::{Digest as Md5Digest, Md5};
use bytes::Bytes;
use hyper::{Body, Request, Response}; use hyper::{Body, Request, Response};
use serde::Serialize; use serde::Serialize;
@ -311,7 +312,7 @@ pub async fn handle_upload_part_copy(
stream::once(async move { stream::once(async move {
let data = garage3.block_manager.rpc_get_block(&block_hash).await?; let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
match range_to_copy { match range_to_copy {
Some(r) => Ok((data[r].to_vec(), None)), Some(r) => Ok((data.slice(r), None)),
None => Ok((data, Some(block_hash))), None => Ok((data, Some(block_hash))),
} }
}) })
@ -556,7 +557,7 @@ impl CopyPreconditionHeaders {
} }
} }
type BlockStreamItemOk = (Vec<u8>, Option<Hash>); type BlockStreamItemOk = (Bytes, Option<Hash>);
type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>; type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
struct Defragmenter<S: Stream<Item = BlockStreamItem>> { struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
@ -589,7 +590,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
if self.buffer.is_empty() { if self.buffer.is_empty() {
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
self.buffer = next_block; self.buffer = next_block.to_vec(); // TODO TOO MUCH COPY
self.hash = next_block_hash; self.hash = next_block_hash;
} else if self.buffer.len() + peeked_next_block.len() > self.block_size { } else if self.buffer.len() + peeked_next_block.len() > self.block_size {
break; break;
@ -600,7 +601,10 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
} }
} }
Ok((std::mem::take(&mut self.buffer), self.hash.take())) Ok((
Bytes::from(std::mem::take(&mut self.buffer)),
self.hash.take(),
))
} }
} }

View file

@ -242,10 +242,13 @@ pub async fn handle_get(
Ok(resp_builder.body(body)?) Ok(resp_builder.body(body)?)
} }
ObjectVersionData::FirstBlock(_, first_block_hash) => { ObjectVersionData::FirstBlock(_, first_block_hash) => {
let read_first_block = garage.block_manager.rpc_get_block(first_block_hash); let read_first_block = garage
.block_manager
.rpc_get_block_streaming(first_block_hash);
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; let (first_block_stream, version) =
futures::try_join!(read_first_block, get_next_blocks)?;
let version = version.ok_or(Error::NoSuchKey)?; let version = version.ok_or(Error::NoSuchKey)?;
let mut blocks = version let mut blocks = version
@ -254,24 +257,32 @@ pub async fn handle_get(
.iter() .iter()
.map(|(_, vb)| (vb.hash, None)) .map(|(_, vb)| (vb.hash, None))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
blocks[0].1 = Some(first_block); blocks[0].1 = Some(first_block_stream);
let body_stream = futures::stream::iter(blocks) let body_stream = futures::stream::iter(blocks)
.map(move |(hash, data_opt)| { .map(move |(hash, stream_opt)| {
let garage = garage.clone(); let garage = garage.clone();
async move { async move {
if let Some(data) = data_opt { if let Some(stream) = stream_opt {
Ok(Bytes::from(data)) stream
} else { } else {
garage garage
.block_manager .block_manager
.rpc_get_block(&hash) .rpc_get_block_streaming(&hash)
.await .await
.map(Bytes::from) .unwrap_or_else(|_| {
Box::pin(futures::stream::once(async move {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Could not get next block",
))
}))
})
} }
} }
}) })
.buffered(2); .buffered(3)
.flatten();
let body = hyper::body::Body::wrap_stream(body_stream); let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?) Ok(resp_builder.body(body)?)

View file

@ -27,6 +27,8 @@ bytes = "1.0"
hex = "0.4" hex = "0.4"
tracing = "0.1.30" tracing = "0.1.30"
rand = "0.8" rand = "0.8"
async-compression = { version = "0.3", features = ["tokio", "zstd"] }
zstd = { version = "0.9", default-features = false } zstd = { version = "0.9", default-features = false }
rmp-serde = "0.15" rmp-serde = "0.15"
@ -36,3 +38,4 @@ serde_bytes = "0.11"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-util = { version = "0.6", features = ["io"] }

View file

@ -5,13 +5,18 @@ use zstd::stream::{decode_all as zstd_decode, Encoder};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub enum DataBlockHeader {
Plain,
Compressed,
}
/// A possibly compressed block of data /// A possibly compressed block of data
#[derive(Debug, Serialize, Deserialize)]
pub enum DataBlock { pub enum DataBlock {
/// Uncompressed data /// Uncompressed data
Plain(#[serde(with = "serde_bytes")] Vec<u8>), Plain(Bytes),
/// Data compressed with zstd /// Data compressed with zstd
Compressed(#[serde(with = "serde_bytes")] Vec<u8>), Compressed(Bytes),
} }
impl DataBlock { impl DataBlock {
@ -31,7 +36,7 @@ impl DataBlock {
/// Get the buffer, possibly decompressing it, and verify it's integrity. /// Get the buffer, possibly decompressing it, and verify it's integrity.
/// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
/// is used instead. /// is used instead.
pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> { pub fn verify_get(self, hash: Hash) -> Result<Bytes, Error> {
match self { match self {
DataBlock::Plain(data) => { DataBlock::Plain(data) => {
if blake2sum(&data) == hash { if blake2sum(&data) == hash {
@ -40,9 +45,9 @@ impl DataBlock {
Err(Error::CorruptData(hash)) Err(Error::CorruptData(hash))
} }
} }
DataBlock::Compressed(data) => { DataBlock::Compressed(data) => zstd_decode(&data[..])
zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) .map_err(|_| Error::CorruptData(hash))
} .map(Bytes::from),
} }
} }
@ -66,14 +71,28 @@ impl DataBlock {
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
if let Some(level) = level { if let Some(level) = level {
if let Ok(data) = zstd_encode(&data[..], level) { if let Ok(data) = zstd_encode(&data[..], level) {
return DataBlock::Compressed(data); return DataBlock::Compressed(data.into());
} }
} }
DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here DataBlock::Plain(data)
}) })
.await .await
.unwrap() .unwrap()
} }
pub fn into_parts(self) -> (DataBlockHeader, Bytes) {
match self {
DataBlock::Plain(data) => (DataBlockHeader::Plain, data),
DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data),
}
}
pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self {
match h {
DataBlockHeader::Plain => DataBlock::Plain(bytes),
DataBlockHeader::Compressed => DataBlock::Compressed(bytes),
}
}
} }
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {

View file

@ -1,5 +1,6 @@
use std::convert::TryInto; use std::convert::TryInto;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -8,8 +9,10 @@ use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use futures::{Stream, TryStreamExt};
use futures_util::stream::StreamExt;
use tokio::fs; use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch, Mutex, Notify}; use tokio::sync::{mpsc, watch, Mutex, Notify};
@ -18,6 +21,8 @@ use opentelemetry::{
Context, KeyValue, Context, KeyValue,
}; };
use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db; use garage_db as db;
use garage_db::counted_tree_hack::CountedTree; use garage_db::counted_tree_hack::CountedTree;
@ -70,7 +75,7 @@ pub enum BlockRpc {
/// block /// block
PutBlock { PutBlock {
hash: Hash, hash: Hash,
data: DataBlock, header: DataBlockHeader,
}, },
/// Ask other node if they should have this block, but don't actually have it /// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash), NeedBlockQuery(Hash),
@ -174,56 +179,146 @@ impl BlockManager {
} }
/// Ask nodes that might have a (possibly compressed) block for it /// Ask nodes that might have a (possibly compressed) block for it
/// Return it as a stream with a header
async fn rpc_get_raw_block_streaming(
&self,
hash: &Hash,
) -> Result<(DataBlockHeader, ByteStream), Error> {
let who = self.replication.read_nodes(hash);
for node in who.iter() {
let node_id = NodeID::from(*node);
let rpc =
self.endpoint
.call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL);
tokio::select! {
res = rpc => {
let res = match res {
Ok(res) => res,
Err(e) => {
debug!("Node {:?} returned error: {}", node, e);
continue;
}
};
let (header, stream) = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
_ => {
debug!("Node {:?} returned a malformed response", node);
continue;
}
};
return Ok((header, stream));
}
_ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
}
};
}
Err(Error::Message(format!(
"Unable to read block {:?}: no node returned a valid block",
hash
)))
}
/// Ask nodes that might have a (possibly compressed) block for it
/// Return its entire body
async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> { async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
let who = self.replication.read_nodes(hash); let who = self.replication.read_nodes(hash);
let resps = self
.system
.rpc
.try_call_many(
&self.endpoint,
&who[..],
BlockRpc::GetBlock(*hash),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
.with_timeout(BLOCK_RW_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
for resp in resps { for node in who.iter() {
if let BlockRpc::PutBlock { data, .. } = resp { let node_id = NodeID::from(*node);
return Ok(data); let rpc =
} self.endpoint
.call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL);
tokio::select! {
res = rpc => {
let res = match res {
Ok(res) => res,
Err(e) => {
debug!("Node {:?} returned error: {}", node, e);
continue;
}
};
let (header, stream) = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
_ => {
debug!("Node {:?} returned a malformed response", node);
continue;
}
};
match read_stream_to_end(stream).await {
Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)),
Err(e) => {
debug!("Error reading stream from node {:?}: {}", node, e);
}
}
}
_ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
}
};
} }
Err(Error::Message(format!( Err(Error::Message(format!(
"Unable to read block {:?}: no valid blocks returned", "Unable to read block {:?}: no node returned a valid block",
hash hash
))) )))
} }
// ---- Public interface ---- // ---- Public interface ----
/// Ask nodes that might have a block for it,
/// return it as a stream
pub async fn rpc_get_block_streaming(
&self,
hash: &Hash,
) -> Result<
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>,
Error,
> {
let (header, stream) = self.rpc_get_raw_block_streaming(hash).await?;
match header {
DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error")
}))),
DataBlockHeader::Compressed => {
// Too many things, I hate it.
let reader = stream_asyncread(stream);
let reader = BufReader::new(reader);
let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader);
Ok(Box::pin(tokio_util::io::ReaderStream::new(reader)))
}
}
}
/// Ask nodes that might have a block for it /// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Bytes, Error> {
self.rpc_get_raw_block(hash).await?.verify_get(*hash) self.rpc_get_raw_block(hash).await?.verify_get(*hash)
} }
/// Send block to nodes that should have it /// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash); let who = self.replication.write_nodes(&hash);
let data = DataBlock::from_buffer(data, self.compression_level).await;
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
.await
.into_parts();
let put_block_rpc =
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
self.system self.system
.rpc .rpc
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&who[..], &who[..],
// TODO: remove to_vec() here put_block_rpc,
BlockRpc::PutBlock { hash, data },
RequestStrategy::with_priority(PRIO_NORMAL) RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.replication.write_quorum()) .with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT), .with_timeout(BLOCK_RW_TIMEOUT),
) )
.await?; .await?;
Ok(()) Ok(())
} }
@ -308,13 +403,25 @@ impl BlockManager {
// ---- Reading and writing blocks locally ---- // ---- Reading and writing blocks locally ----
async fn handle_put_block(
&self,
hash: Hash,
header: DataBlockHeader,
stream: Option<ByteStream>,
) -> Result<(), Error> {
let stream = stream.ok_or_message("missing stream")?;
let bytes = read_stream_to_end(stream).await?;
let data = DataBlock::from_parts(header, bytes);
self.write_block(&hash, &data).await
}
/// Write a block to disk /// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> { async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage"); let tracer = opentelemetry::global::tracer("garage");
let write_size = data.inner_buffer().len() as u64; let write_size = data.inner_buffer().len() as u64;
let res = self.mutation_lock[hash.as_slice()[0] as usize] self.mutation_lock[hash.as_slice()[0] as usize]
.lock() .lock()
.with_context(Context::current_with_span( .with_context(Context::current_with_span(
tracer.start("Acquire mutation_lock"), tracer.start("Acquire mutation_lock"),
@ -329,21 +436,31 @@ impl BlockManager {
self.metrics.bytes_written.add(write_size); self.metrics.bytes_written.add(write_size);
Ok(res) Ok(())
}
async fn handle_get_block(&self, hash: &Hash) -> Resp<BlockRpc> {
let block = match self.read_block(hash).await {
Ok(data) => data,
Err(e) => return Resp::new(Err(e)),
};
let (header, data) = block.into_parts();
self.metrics.bytes_read.add(data.len() as u64);
Resp::new(Ok(BlockRpc::PutBlock {
hash: *hash,
header,
}))
.with_stream_from_buffer(data)
} }
/// Read block from disk, verifying it's integrity /// Read block from disk, verifying it's integrity
pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
let data = self self.read_block_internal(hash)
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration) .bound_record_duration(&self.metrics.block_read_duration)
.await?; .await
self.metrics
.bytes_read
.add(data.inner_buffer().len() as u64);
Ok(BlockRpc::PutBlock { hash: *hash, data })
} }
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> { async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
@ -366,9 +483,9 @@ impl BlockManager {
drop(f); drop(f);
let data = if compressed { let data = if compressed {
DataBlock::Compressed(data) DataBlock::Compressed(data.into())
} else { } else {
DataBlock::Plain(data) DataBlock::Plain(data.into())
}; };
if data.verify(*hash).is_err() { if data.verify(*hash).is_err() {
@ -675,7 +792,13 @@ impl BlockManager {
.add(1, &[KeyValue::new("to", format!("{:?}", node))]); .add(1, &[KeyValue::new("to", format!("{:?}", node))]);
} }
let put_block_message = self.read_block(hash).await?; let block = self.read_block(hash).await?;
let (header, bytes) = block.into_parts();
let put_block_message = Req::new(BlockRpc::PutBlock {
hash: *hash,
header,
})?
.with_stream_from_buffer(bytes);
self.system self.system
.rpc .rpc
.try_call_many( .try_call_many(
@ -723,17 +846,19 @@ impl BlockManager {
} }
#[async_trait] #[async_trait]
impl EndpointHandler<BlockRpc> for BlockManager { impl StreamingEndpointHandler<BlockRpc> for BlockManager {
async fn handle( async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> {
self: &Arc<Self>, match message.msg() {
message: &BlockRpc, BlockRpc::PutBlock { hash, header } => Resp::new(
_from: NodeID, self.handle_put_block(*hash, *header, message.take_stream())
) -> Result<BlockRpc, Error> { .await
match message { .map(|_| BlockRpc::Ok),
BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await, ),
BlockRpc::GetBlock(h) => self.read_block(h).await, BlockRpc::GetBlock(h) => self.handle_get_block(h).await,
BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), BlockRpc::NeedBlockQuery(h) => {
m => Err(Error::unexpected_rpc_message(m)), Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply))
}
m => Resp::new(Err(Error::unexpected_rpc_message(m))),
} }
} }
} }
@ -831,7 +956,7 @@ impl BlockManagerLocked {
hash: &Hash, hash: &Hash,
data: &DataBlock, data: &DataBlock,
mgr: &BlockManager, mgr: &BlockManager,
) -> Result<BlockRpc, Error> { ) -> Result<(), Error> {
let compressed = data.is_compressed(); let compressed = data.is_compressed();
let data = data.inner_buffer(); let data = data.inner_buffer();
@ -842,8 +967,8 @@ impl BlockManagerLocked {
fs::create_dir_all(&directory).await?; fs::create_dir_all(&directory).await?;
let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
(Ok(true), _) => return Ok(BlockRpc::Ok), (Ok(true), _) => return Ok(()),
(Ok(false), false) => return Ok(BlockRpc::Ok), (Ok(false), false) => return Ok(()),
(Ok(false), true) => { (Ok(false), true) => {
let path_to_delete = path.clone(); let path_to_delete = path.clone();
path.set_extension("zst"); path.set_extension("zst");
@ -882,7 +1007,7 @@ impl BlockManagerLocked {
dir.sync_all().await?; dir.sync_all().await?;
drop(dir); drop(dir);
Ok(BlockRpc::Ok) Ok(())
} }
async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
@ -963,3 +1088,17 @@ impl ErrorCounter {
self.last_try + self.delay_msec() self.last_try + self.delay_msec()
} }
} }
async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
let mut parts: Vec<Bytes> = vec![];
while let Some(part) = stream.next().await {
parts.push(part.ok_or_message("error in stream")?);
}
Ok(parts
.iter()
.map(|x| &x[..])
.collect::<Vec<_>>()
.concat()
.into())
}

View file

@ -15,10 +15,13 @@ use opentelemetry::{
Context, Context,
}; };
pub use netapp::endpoint::{Endpoint, EndpointHandler}; pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
pub use netapp::message::{Message as Rpc, *}; use netapp::message::IntoReq;
pub use netapp::message::{
Message as Rpc, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
};
use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::{NetApp, NodeID}; pub use netapp::{self, NetApp, NodeID};
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
@ -117,7 +120,7 @@ impl RpcHelper {
where where
M: Rpc<Response = Result<S, Error>>, M: Rpc<Response = Result<S, Error>>,
N: IntoReq<M> + Send, N: IntoReq<M> + Send,
H: EndpointHandler<M>, H: StreamingEndpointHandler<M>,
{ {
let metric_tags = [ let metric_tags = [
KeyValue::new("rpc_endpoint", endpoint.path().to_string()), KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
@ -172,7 +175,7 @@ impl RpcHelper {
where where
M: Rpc<Response = Result<S, Error>>, M: Rpc<Response = Result<S, Error>>,
N: IntoReq<M>, N: IntoReq<M>,
H: EndpointHandler<M>, H: StreamingEndpointHandler<M>,
{ {
let msg = msg.into_req().map_err(netapp::error::Error::from)?; let msg = msg.into_req().map_err(netapp::error::Error::from)?;
@ -197,7 +200,7 @@ impl RpcHelper {
where where
M: Rpc<Response = Result<S, Error>>, M: Rpc<Response = Result<S, Error>>,
N: IntoReq<M>, N: IntoReq<M>,
H: EndpointHandler<M>, H: StreamingEndpointHandler<M>,
{ {
let to = self let to = self
.0 .0
@ -211,16 +214,17 @@ impl RpcHelper {
/// Make a RPC call to multiple servers, returning either a Vec of responses, /// Make a RPC call to multiple servers, returning either a Vec of responses,
/// or an error if quorum could not be reached due to too many errors /// or an error if quorum could not be reached due to too many errors
pub async fn try_call_many<M, H, S>( pub async fn try_call_many<M, N, H, S>(
&self, &self,
endpoint: &Arc<Endpoint<M, H>>, endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid], to: &[Uuid],
msg: M, msg: N,
strategy: RequestStrategy, strategy: RequestStrategy,
) -> Result<Vec<S>, Error> ) -> Result<Vec<S>, Error>
where where
M: Rpc<Response = Result<S, Error>> + 'static, M: Rpc<Response = Result<S, Error>> + 'static,
H: EndpointHandler<M> + 'static, N: IntoReq<M>,
H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static, S: Send + 'static,
{ {
let quorum = strategy.rs_quorum.unwrap_or(to.len()); let quorum = strategy.rs_quorum.unwrap_or(to.len());
@ -261,7 +265,7 @@ impl RpcHelper {
where where
M: Rpc<Response = Result<S, Error>> + 'static, M: Rpc<Response = Result<S, Error>> + 'static,
N: IntoReq<M>, N: IntoReq<M>,
H: EndpointHandler<M> + 'static, H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static, S: Send + 'static,
{ {
let msg = msg.into_req().map_err(netapp::error::Error::from)?; let msg = msg.into_req().map_err(netapp::error::Error::from)?;