From 3fe94cc14fa6e2ec2f749afc3b2c4f4f0fa621fc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 26 Feb 2024 17:22:16 +0100 Subject: [PATCH 1/3] [refactor-put] rewrite read_and_put_block as a series of steps with channels --- src/api/s3/multipart.rs | 14 +-- src/api/s3/put.rs | 184 ++++++++++++++++++++++++---------------- 2 files changed, 111 insertions(+), 87 deletions(-) diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index b9d15b21..5959bcd6 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -6,7 +6,6 @@ use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; use garage_table::*; -use garage_util::async_hash::*; use garage_util::data::*; use garage_model::bucket_table::Bucket; @@ -135,17 +134,8 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let first_block_hash = async_blake2sum(first_block.clone()).await; - - let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - part_number, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; + let (total_size, data_md5sum, data_sha256sum, _) = + read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?; // Verify that checksums map ensure_checksum_matches( diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index fdfa567d..557d1e5f 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -7,6 +7,8 @@ use futures::try_join; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; +use tokio::sync::mpsc; + use hyper::body::Bytes; use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; @@ -168,17 +170,8 @@ pub(crate) async fn save_stream> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block_hash = async_blake2sum(first_block.clone()).await; - - let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; + let (total_size, data_md5sum, data_sha256sum, first_block_hash) = + read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?; ensure_checksum_matches( data_md5sum.as_slice(), @@ -299,84 +292,121 @@ pub(crate) async fn read_and_put_blocks> + version: &Version, part_number: u64, first_block: Bytes, - first_block_hash: Hash, chunker: &mut StreamChunker, -) -> Result<(u64, GenericArray, Hash), Error> { +) -> Result<(u64, GenericArray, Hash, Hash), Error> { let tracer = opentelemetry::global::tracer("garage"); - let md5hasher = AsyncHasher::::new(); - let sha256hasher = AsyncHasher::::new(); - - futures::future::join( - md5hasher.update(first_block.clone()), - sha256hasher.update(first_block.clone()), - ) - .with_context(Context::current_with_span( - tracer.start("Hash first block (md5, sha256)"), - )) - .await; - - let mut next_offset = first_block.len(); - let mut put_curr_version_block = put_block_meta( - garage, - version, - part_number, - 0, - first_block_hash, - first_block.len() as u64, - ); - let mut put_curr_block = garage - .block_manager - .rpc_put_block(first_block_hash, first_block); - - loop { - let (_, _, next_block) = futures::try_join!( - put_curr_block.map_err(Error::from), - put_curr_version_block.map_err(Error::from), - chunker.next(), - )?; - if let Some(block) = next_block { - let (_, _, block_hash) = futures::future::join3( - md5hasher.update(block.clone()), - sha256hasher.update(block.clone()), - async_blake2sum(block.clone()), - ) - .with_context(Context::current_with_span( - tracer.start("Hash block (md5, sha256, blake2)"), - )) - .await; - let block_len = block.len(); - put_curr_version_block = put_block_meta( - garage, - version, - part_number, - next_offset as u64, - block_hash, - block_len as u64, - ); - put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); - next_offset += block_len; - } else { - break; + let (block_tx, mut block_rx) = mpsc::channel::>(2); + let read_blocks = async { + block_tx.send(Ok(first_block)).await?; + loop { + let res = chunker + .next() + .with_context(Context::current_with_span( + tracer.start("Read block from client"), + )) + .await; + match res { + Ok(Some(block)) => block_tx.send(Ok(block)).await?, + Ok(None) => break, + Err(e) => { + block_tx.send(Err(e)).await?; + break; + } + } } - } + drop(block_tx); + Ok::<_, mpsc::error::SendError<_>>(()) + }; - let total_size = next_offset as u64; - let data_md5sum = md5hasher.finalize().await; + let (block_tx2, mut block_rx2) = mpsc::channel::>(1); + let hash_stream = async { + let md5hasher = AsyncHasher::::new(); + let sha256hasher = AsyncHasher::::new(); + while let Some(next) = block_rx.recv().await { + match next { + Ok(block) => { + block_tx2.send(Ok(block.clone())).await?; + futures::future::join( + md5hasher.update(block.clone()), + sha256hasher.update(block.clone()), + ) + .with_context(Context::current_with_span( + tracer.start("Hash block (md5, sha256)"), + )) + .await; + } + Err(e) => { + block_tx2.send(Err(e)).await?; + break; + } + } + } + drop(block_tx2); + Ok::<_, mpsc::error::SendError<_>>(futures::join!( + md5hasher.finalize(), + sha256hasher.finalize() + )) + }; + + let (block_tx3, mut block_rx3) = mpsc::channel::>(1); + let hash_blocks = async { + let mut first_block_hash = None; + while let Some(next) = block_rx2.recv().await { + match next { + Ok(block) => { + let hash = async_blake2sum(block.clone()) + .with_context(Context::current_with_span( + tracer.start("Hash block (blake2)"), + )) + .await; + if first_block_hash.is_none() { + first_block_hash = Some(hash); + } + block_tx3.send(Ok((block, hash))).await?; + } + Err(e) => { + block_tx3.send(Err(e)).await?; + break; + } + } + } + drop(block_tx3); + Ok::<_, mpsc::error::SendError<_>>(first_block_hash.unwrap()) + }; + + let put_blocks = async { + let mut written_bytes = 0u64; + while let Some(next) = block_rx3.recv().await { + let (block, hash) = next?; + let offset = written_bytes; + written_bytes += block.len() as u64; + put_block_and_meta(garage, version, part_number, offset, hash, block).await?; + } + Ok::<_, Error>(written_bytes) + }; + + let (_, stream_hash_result, block_hash_result, final_result) = + futures::join!(read_blocks, hash_stream, hash_blocks, put_blocks); + + let total_size = final_result?; + // unwrap here is ok, because if hasher failed, it is because something failed + // later in the pipeline which already caused a return at the ? on previous line + let (data_md5sum, data_sha256sum) = stream_hash_result.unwrap(); + let first_block_hash = block_hash_result.unwrap(); - let data_sha256sum = sha256hasher.finalize().await; let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); - Ok((total_size, data_md5sum, data_sha256sum)) + Ok((total_size, data_md5sum, data_sha256sum, first_block_hash)) } -async fn put_block_meta( +async fn put_block_and_meta( garage: &Garage, version: &Version, part_number: u64, offset: u64, hash: Hash, - size: u64, + block: Bytes, ) -> Result<(), GarageError> { let mut version = version.clone(); version.blocks.put( @@ -384,7 +414,10 @@ async fn put_block_meta( part_number, offset, }, - VersionBlock { hash, size }, + VersionBlock { + hash, + size: block.len() as u64, + }, ); let block_ref = BlockRef { @@ -394,6 +427,7 @@ async fn put_block_meta( }; futures::try_join!( + garage.block_manager.rpc_put_block(hash, block), garage.version_table.insert(&version), garage.block_ref_table.insert(&block_ref), )?; From babccd2ad39c0a626d82521b2d4128ec6f194814 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 26 Feb 2024 18:21:17 +0100 Subject: [PATCH 2/3] [refactor-put] send several blocks in parallel to storage nodes --- src/api/s3/put.rs | 49 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 557d1e5f..10a018e4 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; +use futures::stream::FuturesOrdered; use futures::try_join; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -37,6 +38,8 @@ use crate::helpers::*; use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; +const PUT_BLOCKS_MAX_PARALLEL: usize = 3; + pub async fn handle_put( garage: Arc, req: Request, @@ -376,12 +379,52 @@ pub(crate) async fn read_and_put_blocks> + }; let put_blocks = async { + // Structure for handling several concurrent writes to storage nodes + let mut write_futs = FuturesOrdered::new(); let mut written_bytes = 0u64; - while let Some(next) = block_rx3.recv().await { - let (block, hash) = next?; + loop { + // Simultaneously write blocks to storage nodes & await for next block to be written + let currently_running = write_futs.len(); + let write_futs_next = async { + if write_futs.is_empty() { + futures::future::pending().await + } else { + write_futs.next().await.unwrap() + } + }; + let recv_next = async { + // If more than a maximum number of writes are in progress, don't add more for now + if currently_running >= PUT_BLOCKS_MAX_PARALLEL { + futures::future::pending().await + } else { + block_rx3.recv().await + } + }; + let (block, hash) = tokio::select! { + result = write_futs_next => { + result?; + continue; + }, + recv = recv_next => match recv { + Some(next) => next?, + None => break, + }, + }; + + // For next block to be written: count its size and spawn future to write it let offset = written_bytes; written_bytes += block.len() as u64; - put_block_and_meta(garage, version, part_number, offset, hash, block).await?; + write_futs.push_back(put_block_and_meta( + garage, + version, + part_number, + offset, + hash, + block, + )); + } + while let Some(res) = write_futs.next().await { + res?; } Ok::<_, Error>(written_bytes) }; From b76c0c102ee07758ecf8ae4dfeef0a9a095c4136 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 26 Feb 2024 18:34:52 +0100 Subject: [PATCH 3/3] [refactor-put] add ordering tag to blocks being sent to storage nodes --- src/api/s3/copy.rs | 5 ++++- src/api/s3/put.rs | 8 +++++++- src/block/manager.rs | 12 +++++++++++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 7eb1fe60..880ce5f4 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -387,7 +387,10 @@ pub async fn handle_upload_part_copy( // we need to insert that data as a new block. async move { if must_upload { - garage2.block_manager.rpc_put_block(final_hash, data).await + garage2 + .block_manager + .rpc_put_block(final_hash, data, None) + .await } else { Ok(()) } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 10a018e4..489f1136 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -20,6 +20,7 @@ use opentelemetry::{ }; use garage_net::bytes_buf::BytesBuf; +use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::async_hash::*; use garage_util::data::*; @@ -380,6 +381,7 @@ pub(crate) async fn read_and_put_blocks> + let put_blocks = async { // Structure for handling several concurrent writes to storage nodes + let order_stream = OrderTag::stream(); let mut write_futs = FuturesOrdered::new(); let mut written_bytes = 0u64; loop { @@ -421,6 +423,7 @@ pub(crate) async fn read_and_put_blocks> + offset, hash, block, + order_stream.order(written_bytes), )); } while let Some(res) = write_futs.next().await { @@ -450,6 +453,7 @@ async fn put_block_and_meta( offset: u64, hash: Hash, block: Bytes, + order_tag: OrderTag, ) -> Result<(), GarageError> { let mut version = version.clone(); version.blocks.put( @@ -470,7 +474,9 @@ async fn put_block_and_meta( }; futures::try_join!( - garage.block_manager.rpc_put_block(hash, block), + garage + .block_manager + .rpc_put_block(hash, block, Some(order_tag)), garage.version_table.insert(&version), garage.block_ref_table.insert(&block_ref), )?; diff --git a/src/block/manager.rs b/src/block/manager.rs index 817866f6..890ea8b7 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -346,7 +346,12 @@ impl BlockManager { } /// Send block to nodes that should have it - pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { + pub async fn rpc_put_block( + &self, + hash: Hash, + data: Bytes, + order_tag: Option, + ) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) @@ -354,6 +359,11 @@ impl BlockManager { .into_parts(); let put_block_rpc = Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); + let put_block_rpc = if let Some(tag) = order_tag { + put_block_rpc.with_order_tag(tag) + } else { + put_block_rpc + }; self.system .rpc