diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index f8a269c0..1b2573a1 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -1,7 +1,8 @@ +use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use futures::{stream, StreamExt, TryFutureExt}; +use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; use md5::{Digest as Md5Digest, Md5}; use hyper::{Body, Request, Response}; @@ -316,118 +317,67 @@ pub async fn handle_upload_part_copy( }) }) .peekable(); - let mut source_blocks = Box::pin(source_blocks); - // Keep information about the next block we want to insert + // The defragmenter is a custom stream (defined below) that concatenates + // consecutive block parts when they are too small. + // It returns a series of (Vec, Option). + // When it is done, it returns an empty vec. + // Same as the previous iterator, the Option is Some(_) if and only if + // it's an existing block of the Garage data store. + let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks)); + let mut current_offset = 0; - let mut next_block: Option<(Vec, Option)> = None; + let mut next_block = defragmenter.next().await?; - // State of the defragmenter - let config_block_size = garage.config.block_size; - let mut defrag_buffer = vec![]; - let mut defrag_hash = None; - - // We loop a step that does two things concurrently: - // 1. if there is a block to be inserted in next_block, insert it - // 2. grab the next block (it might be composed of several sub-blocks - // to be concatenated to ensure defragmentation) loop { - let insert_current_block = async { - if let Some((data, existing_block_hash)) = next_block { - md5hasher.update(&data[..]); - - let must_upload = existing_block_hash.is_none(); - let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..])); - - let mut version = - Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); - version.blocks.put( - VersionBlockKey { - part_number, - offset: current_offset, - }, - VersionBlock { - hash: final_hash, - size: data.len() as u64, - }, - ); - current_offset += data.len() as u64; - - let block_ref = BlockRef { - block: final_hash, - version: dest_version_uuid, - deleted: false.into(), - }; - - let garage2 = garage.clone(); - futures::try_join!( - // Thing 1: if the block is not exactly a block that existed before, - // we need to insert that data as a new block. - async move { - if must_upload { - garage2.block_manager.rpc_put_block(final_hash, data).await - } else { - Ok(()) - } - }, - // Thing 2: we need to insert the block in the version - garage.version_table.insert(&version), - // Thing 3: we need to add a block reference - garage.block_ref_table.insert(&block_ref), - )?; - Ok(()) - } else { - Ok(()) - } - }; - - let get_next_block = async { - loop { - let tmpres: Option<&Result<(Vec, Option), garage_util::error::Error>> = - source_blocks.as_mut().peek().await; - if let Some(res) = tmpres { - let (next_block, _) = match res { - Ok(t) => t, - Err(_) => { - source_blocks.next().await.unwrap()?; - unreachable!() - } - }; - - if defrag_buffer.is_empty() { - let (next_block, next_block_hash) = source_blocks.next().await.unwrap()?; - defrag_buffer = next_block; - defrag_hash = next_block_hash; - } else if defrag_buffer.len() + next_block.len() > config_block_size { - return Ok(( - std::mem::replace(&mut defrag_buffer, vec![]), - std::mem::replace(&mut defrag_hash, None), - )); - } else { - let (next_block, _) = source_blocks.next().await.unwrap()?; - defrag_buffer.extend(next_block); - defrag_hash = None; - } - } else { - return Ok::<_, garage_util::error::Error>(( - std::mem::replace(&mut defrag_buffer, vec![]), - std::mem::replace(&mut defrag_hash, None), - )); - } - } - }; - - let tmp: Result<(_, (Vec, Option)), garage_util::error::Error> = futures::try_join!( - insert_current_block, - // Thing 4: we need to prefetch the next block - get_next_block, - ); - let tmp_block = tmp?.1; - - if tmp_block.0.is_empty() { + let (data, existing_block_hash) = next_block; + if data.is_empty() { break; } - next_block = Some(tmp_block); + + md5hasher.update(&data[..]); + + let must_upload = existing_block_hash.is_none(); + let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..])); + + let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); + version.blocks.put( + VersionBlockKey { + part_number, + offset: current_offset, + }, + VersionBlock { + hash: final_hash, + size: data.len() as u64, + }, + ); + current_offset += data.len() as u64; + + let block_ref = BlockRef { + block: final_hash, + version: dest_version_uuid, + deleted: false.into(), + }; + + let garage2 = garage.clone(); + let res = futures::try_join!( + // Thing 1: if the block is not exactly a block that existed before, + // we need to insert that data as a new block. + async move { + if must_upload { + garage2.block_manager.rpc_put_block(final_hash, data).await + } else { + Ok(()) + } + }, + // Thing 2: we need to insert the block in the version + garage.version_table.insert(&version), + // Thing 3: we need to add a block reference + garage.block_ref_table.insert(&block_ref), + // Thing 4: we need to prefetch the next block + defragmenter.next(), + )?; + next_block = res.3; } let data_md5sum = md5hasher.finalize(); @@ -600,6 +550,54 @@ impl CopyPreconditionHeaders { } } +type BlockStreamItemOk = (Vec, Option); +type BlockStreamItem = Result; + +struct Defragmenter> { + block_size: usize, + block_stream: Pin>>, + buffer: Vec, + hash: Option, +} + +impl> Defragmenter { + fn new(block_size: usize, block_stream: Pin>>) -> Self { + Self { + block_size, + block_stream, + buffer: vec![], + hash: None, + } + } + + async fn next(&mut self) -> BlockStreamItem { + // Fill buffer while we can + while let Some(res) = self.block_stream.as_mut().peek().await { + let (peeked_next_block, _) = match res { + Ok(t) => t, + Err(_) => { + self.block_stream.next().await.unwrap()?; + unreachable!() + } + }; + + if self.buffer.is_empty() { + let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; + self.buffer = next_block; + self.hash = next_block_hash; + } else if self.buffer.len() + peeked_next_block.len() > self.block_size { + break; + } else { + let (next_block, _) = self.block_stream.next().await.unwrap()?; + self.buffer.extend(next_block); + self.hash = None; + } + } + + return Ok((std::mem::take(&mut self.buffer), self.hash.take())); + } +} + #[derive(Debug, Serialize, PartialEq)] pub struct CopyObjectResult { #[serde(rename = "LastModified")]