rewrite read_and_put_block as a series of steps with channels #734

Merged
lx merged 3 commits from refactor-put into main 2024-02-26 17:52:45 +00:00
Showing only changes of commit babccd2ad3 - Show all commits

View file

@ -3,6 +3,7 @@ use std::sync::Arc;
use base64::prelude::*;
use futures::prelude::*;
use futures::stream::FuturesOrdered;
use futures::try_join;
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
@ -37,6 +38,8 @@ use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*;
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<ReqBody>,
@ -376,12 +379,52 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
};
let put_blocks = async {
// Structure for handling several concurrent writes to storage nodes
let mut write_futs = FuturesOrdered::new();
let mut written_bytes = 0u64;
while let Some(next) = block_rx3.recv().await {
let (block, hash) = next?;
loop {
// Simultaneously write blocks to storage nodes & await for next block to be written
let currently_running = write_futs.len();
let write_futs_next = async {
if write_futs.is_empty() {
futures::future::pending().await
} else {
write_futs.next().await.unwrap()
}
};
let recv_next = async {
// If more than a maximum number of writes are in progress, don't add more for now
if currently_running >= PUT_BLOCKS_MAX_PARALLEL {
futures::future::pending().await
} else {
block_rx3.recv().await
}
};
let (block, hash) = tokio::select! {
result = write_futs_next => {
result?;
continue;
},
recv = recv_next => match recv {
Some(next) => next?,
None => break,
},
};
// For next block to be written: count its size and spawn future to write it
let offset = written_bytes;
written_bytes += block.len() as u64;
put_block_and_meta(garage, version, part_number, offset, hash, block).await?;
write_futs.push_back(put_block_and_meta(
garage,
version,
part_number,
offset,
hash,
block,
));
}
while let Some(res) = write_futs.next().await {
res?;
}
Ok::<_, Error>(written_bytes)
};