diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index f92dfcf10..f8a269c03 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use futures::TryFutureExt; +use futures::{stream, StreamExt, TryFutureExt}; use md5::{Digest as Md5Digest, Md5}; use hyper::{Body, Request, Response}; @@ -298,77 +298,136 @@ pub async fn handle_upload_part_copy( // Now, actually copy the blocks let mut md5hasher = Md5::new(); - let mut block = Some( - garage - .block_manager - .rpc_get_block(&blocks_to_copy[0].0) - .await?, - ); + // First, create a stream that is able to read the source blocks + // and extract the subrange if necessary. + // The second returned value is an Option, that is Some + // if and only if the block returned is a block that already existed + // in the Garage data store (thus we don't need to save it again). + let garage2 = garage.clone(); + let source_blocks = stream::iter(blocks_to_copy) + .flat_map(|(block_hash, range_to_copy)| { + let garage3 = garage2.clone(); + stream::once(async move { + let data = garage3.block_manager.rpc_get_block(&block_hash).await?; + match range_to_copy { + Some(r) => Ok((data[r].to_vec(), None)), + None => Ok((data, Some(block_hash))), + } + }) + }) + .peekable(); + let mut source_blocks = Box::pin(source_blocks); + // Keep information about the next block we want to insert let mut current_offset = 0; - for (i, (block_hash, range_to_copy)) in blocks_to_copy.iter().enumerate() { - let (current_block, subrange_hash) = match range_to_copy.clone() { - Some(r) => { - let subrange = block.take().unwrap()[r].to_vec(); - let hash = blake2sum(&subrange); - (subrange, hash) + let mut next_block: Option<(Vec, Option)> = None; + + // State of the defragmenter + let config_block_size = garage.config.block_size; + let mut defrag_buffer = vec![]; + let mut defrag_hash = None; + + // We loop a step that does two things concurrently: + // 1. if there is a block to be inserted in next_block, insert it + // 2. grab the next block (it might be composed of several sub-blocks + // to be concatenated to ensure defragmentation) + loop { + let insert_current_block = async { + if let Some((data, existing_block_hash)) = next_block { + md5hasher.update(&data[..]); + + let must_upload = existing_block_hash.is_none(); + let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..])); + + let mut version = + Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); + version.blocks.put( + VersionBlockKey { + part_number, + offset: current_offset, + }, + VersionBlock { + hash: final_hash, + size: data.len() as u64, + }, + ); + current_offset += data.len() as u64; + + let block_ref = BlockRef { + block: final_hash, + version: dest_version_uuid, + deleted: false.into(), + }; + + let garage2 = garage.clone(); + futures::try_join!( + // Thing 1: if the block is not exactly a block that existed before, + // we need to insert that data as a new block. + async move { + if must_upload { + garage2.block_manager.rpc_put_block(final_hash, data).await + } else { + Ok(()) + } + }, + // Thing 2: we need to insert the block in the version + garage.version_table.insert(&version), + // Thing 3: we need to add a block reference + garage.block_ref_table.insert(&block_ref), + )?; + Ok(()) + } else { + Ok(()) } - None => (block.take().unwrap(), *block_hash), - }; - md5hasher.update(¤t_block[..]); - - let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); - version.blocks.put( - VersionBlockKey { - part_number, - offset: current_offset, - }, - VersionBlock { - hash: subrange_hash, - size: current_block.len() as u64, - }, - ); - current_offset += current_block.len() as u64; - - let block_ref = BlockRef { - block: subrange_hash, - version: dest_version_uuid, - deleted: false.into(), }; - let next_block_hash = blocks_to_copy.get(i + 1).map(|(h, _)| *h); + let get_next_block = async { + loop { + let tmpres: Option<&Result<(Vec, Option), garage_util::error::Error>> = + source_blocks.as_mut().peek().await; + if let Some(res) = tmpres { + let (next_block, _) = match res { + Ok(t) => t, + Err(_) => { + source_blocks.next().await.unwrap()?; + unreachable!() + } + }; - let garage2 = garage.clone(); - let garage3 = garage.clone(); - let is_subrange = range_to_copy.is_some(); - - let (_, _, _, next_block) = futures::try_join!( - // Thing 1: if we are taking a subrange of the source block, - // we need to insert that subrange as a new block. - async move { - if is_subrange { - garage2 - .block_manager - .rpc_put_block(subrange_hash, current_block) - .await + if defrag_buffer.is_empty() { + let (next_block, next_block_hash) = source_blocks.next().await.unwrap()?; + defrag_buffer = next_block; + defrag_hash = next_block_hash; + } else if defrag_buffer.len() + next_block.len() > config_block_size { + return Ok(( + std::mem::replace(&mut defrag_buffer, vec![]), + std::mem::replace(&mut defrag_hash, None), + )); + } else { + let (next_block, _) = source_blocks.next().await.unwrap()?; + defrag_buffer.extend(next_block); + defrag_hash = None; + } } else { - Ok(()) + return Ok::<_, garage_util::error::Error>(( + std::mem::replace(&mut defrag_buffer, vec![]), + std::mem::replace(&mut defrag_hash, None), + )); } - }, - // Thing 2: we need to insert the block in the version - garage.version_table.insert(&version), - // Thing 3: we need to add a block reference - garage.block_ref_table.insert(&block_ref), - // Thing 4: we need to prefetch the next block - async move { - match next_block_hash { - Some(h) => Ok(Some(garage3.block_manager.rpc_get_block(&h).await?)), - None => Ok(None), - } - }, - )?; + } + }; - block = next_block; + let tmp: Result<(_, (Vec, Option)), garage_util::error::Error> = futures::try_join!( + insert_current_block, + // Thing 4: we need to prefetch the next block + get_next_block, + ); + let tmp_block = tmp?.1; + + if tmp_block.0.is_empty() { + break; + } + next_block = Some(tmp_block); } let data_md5sum = md5hasher.finalize();