rewrite read_and_put_block as a series of steps with channels #734

Merged
lx merged 3 commits from refactor-put into main 2024-02-26 17:52:45 +00:00
3 changed files with 22 additions and 3 deletions
Showing only changes of commit b76c0c102e - Show all commits

View file

@ -387,7 +387,10 @@ pub async fn handle_upload_part_copy(
// we need to insert that data as a new block. // we need to insert that data as a new block.
async move { async move {
if must_upload { if must_upload {
garage2.block_manager.rpc_put_block(final_hash, data).await garage2
.block_manager
.rpc_put_block(final_hash, data, None)
.await
} else { } else {
Ok(()) Ok(())
} }

View file

@ -20,6 +20,7 @@ use opentelemetry::{
}; };
use garage_net::bytes_buf::BytesBuf; use garage_net::bytes_buf::BytesBuf;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::*; use garage_table::*;
use garage_util::async_hash::*; use garage_util::async_hash::*;
use garage_util::data::*; use garage_util::data::*;
@ -380,6 +381,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
let put_blocks = async { let put_blocks = async {
// Structure for handling several concurrent writes to storage nodes // Structure for handling several concurrent writes to storage nodes
let order_stream = OrderTag::stream();
let mut write_futs = FuturesOrdered::new(); let mut write_futs = FuturesOrdered::new();
let mut written_bytes = 0u64; let mut written_bytes = 0u64;
loop { loop {
@ -421,6 +423,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
offset, offset,
hash, hash,
block, block,
order_stream.order(written_bytes),
)); ));
} }
while let Some(res) = write_futs.next().await { while let Some(res) = write_futs.next().await {
@ -450,6 +453,7 @@ async fn put_block_and_meta(
offset: u64, offset: u64,
hash: Hash, hash: Hash,
block: Bytes, block: Bytes,
order_tag: OrderTag,
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {
let mut version = version.clone(); let mut version = version.clone();
version.blocks.put( version.blocks.put(
@ -470,7 +474,9 @@ async fn put_block_and_meta(
}; };
futures::try_join!( futures::try_join!(
garage.block_manager.rpc_put_block(hash, block), garage
.block_manager
.rpc_put_block(hash, block, Some(order_tag)),
garage.version_table.insert(&version), garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref), garage.block_ref_table.insert(&block_ref),
)?; )?;

View file

@ -346,7 +346,12 @@ impl BlockManager {
} }
/// Send block to nodes that should have it /// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { pub async fn rpc_put_block(
&self,
hash: Hash,
data: Bytes,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash); let who = self.replication.write_nodes(&hash);
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
@ -354,6 +359,11 @@ impl BlockManager {
.into_parts(); .into_parts();
let put_block_rpc = let put_block_rpc =
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
let put_block_rpc = if let Some(tag) = order_tag {
put_block_rpc.with_order_tag(tag)
} else {
put_block_rpc
};
self.system self.system
.rpc .rpc