diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 0d27f635..4e94d887 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -1,7 +1,8 @@ +use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use futures::TryFutureExt; +use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; use md5::{Digest as Md5Digest, Md5}; use hyper::{Body, Request, Response}; @@ -268,7 +269,6 @@ pub async fn handle_upload_part_copy( let mut blocks_to_copy = vec![]; let mut current_offset = 0; - let mut size_to_copy = 0; for (_bk, block) in source_version.blocks.items().iter() { let (block_begin, block_end) = (current_offset, current_offset + block.size); @@ -289,10 +289,6 @@ pub async fn handle_upload_part_copy( (Some(b), None) => Some(b as usize..block.size as usize), (None, None) => None, }; - size_to_copy += range_to_copy - .as_ref() - .map(|x| x.len() as u64) - .unwrap_or(block.size); blocks_to_copy.push((block.hash, range_to_copy)); } @@ -300,34 +296,49 @@ pub async fn handle_upload_part_copy( current_offset = block_end; } - if size_to_copy < 1024 * 1024 { - return Err(Error::BadRequest(format!( - "Not enough data to copy: {} bytes (minimum: 1MB)", - size_to_copy - ))); - } - // Now, actually copy the blocks let mut md5hasher = Md5::new(); - let mut block = Some( - garage - .block_manager - .rpc_get_block(&blocks_to_copy[0].0) - .await?, - ); + // First, create a stream that is able to read the source blocks + // and extract the subrange if necessary. + // The second returned value is an Option, that is Some + // if and only if the block returned is a block that already existed + // in the Garage data store (thus we don't need to save it again). + let garage2 = garage.clone(); + let source_blocks = stream::iter(blocks_to_copy) + .flat_map(|(block_hash, range_to_copy)| { + let garage3 = garage2.clone(); + stream::once(async move { + let data = garage3.block_manager.rpc_get_block(&block_hash).await?; + match range_to_copy { + Some(r) => Ok((data[r].to_vec(), None)), + None => Ok((data, Some(block_hash))), + } + }) + }) + .peekable(); + + // The defragmenter is a custom stream (defined below) that concatenates + // consecutive block parts when they are too small. + // It returns a series of (Vec, Option). + // When it is done, it returns an empty vec. + // Same as the previous iterator, the Option is Some(_) if and only if + // it's an existing block of the Garage data store. + let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks)); let mut current_offset = 0; - for (i, (block_hash, range_to_copy)) in blocks_to_copy.iter().enumerate() { - let (current_block, subrange_hash) = match range_to_copy.clone() { - Some(r) => { - let subrange = block.take().unwrap()[r].to_vec(); - let hash = blake2sum(&subrange); - (subrange, hash) - } - None => (block.take().unwrap(), *block_hash), - }; - md5hasher.update(¤t_block[..]); + let mut next_block = defragmenter.next().await?; + + loop { + let (data, existing_block_hash) = next_block; + if data.is_empty() { + break; + } + + md5hasher.update(&data[..]); + + let must_upload = existing_block_hash.is_none(); + let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..])); let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); version.blocks.put( @@ -336,33 +347,25 @@ pub async fn handle_upload_part_copy( offset: current_offset, }, VersionBlock { - hash: subrange_hash, - size: current_block.len() as u64, + hash: final_hash, + size: data.len() as u64, }, ); - current_offset += current_block.len() as u64; + current_offset += data.len() as u64; let block_ref = BlockRef { - block: subrange_hash, + block: final_hash, version: dest_version_uuid, deleted: false.into(), }; - let next_block_hash = blocks_to_copy.get(i + 1).map(|(h, _)| *h); - let garage2 = garage.clone(); - let garage3 = garage.clone(); - let is_subrange = range_to_copy.is_some(); - - let (_, _, _, next_block) = futures::try_join!( - // Thing 1: if we are taking a subrange of the source block, - // we need to insert that subrange as a new block. + let res = futures::try_join!( + // Thing 1: if the block is not exactly a block that existed before, + // we need to insert that data as a new block. async move { - if is_subrange { - garage2 - .block_manager - .rpc_put_block(subrange_hash, current_block) - .await + if must_upload { + garage2.block_manager.rpc_put_block(final_hash, data).await } else { Ok(()) } @@ -372,15 +375,9 @@ pub async fn handle_upload_part_copy( // Thing 3: we need to add a block reference garage.block_ref_table.insert(&block_ref), // Thing 4: we need to prefetch the next block - async move { - match next_block_hash { - Some(h) => Ok(Some(garage3.block_manager.rpc_get_block(&h).await?)), - None => Ok(None), - } - }, + defragmenter.next(), )?; - - block = next_block; + next_block = res.3; } let data_md5sum = md5hasher.finalize(); @@ -553,6 +550,54 @@ impl CopyPreconditionHeaders { } } +type BlockStreamItemOk = (Vec, Option); +type BlockStreamItem = Result; + +struct Defragmenter> { + block_size: usize, + block_stream: Pin>>, + buffer: Vec, + hash: Option, +} + +impl> Defragmenter { + fn new(block_size: usize, block_stream: Pin>>) -> Self { + Self { + block_size, + block_stream, + buffer: vec![], + hash: None, + } + } + + async fn next(&mut self) -> BlockStreamItem { + // Fill buffer while we can + while let Some(res) = self.block_stream.as_mut().peek().await { + let (peeked_next_block, _) = match res { + Ok(t) => t, + Err(_) => { + self.block_stream.next().await.unwrap()?; + unreachable!() + } + }; + + if self.buffer.is_empty() { + let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; + self.buffer = next_block; + self.hash = next_block_hash; + } else if self.buffer.len() + peeked_next_block.len() > self.block_size { + break; + } else { + let (next_block, _) = self.block_stream.next().await.unwrap()?; + self.buffer.extend(next_block); + self.hash = None; + } + } + + Ok((std::mem::take(&mut self.buffer), self.hash.take())) + } +} + #[derive(Debug, Serialize, PartialEq)] pub struct CopyObjectResult { #[serde(rename = "LastModified")] diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index e76f7737..88941d78 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -43,14 +43,22 @@ pub async fn cmd_assign_role( resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + let added_nodes = args .node_ids .iter() - .map(|node_id| find_matching_node(status.iter().map(|adv| adv.id), node_id)) + .map(|node_id| { + find_matching_node( + status + .iter() + .map(|adv| adv.id) + .chain(layout.node_ids().iter().cloned()), + node_id, + ) + }) .collect::, _>>()?; - let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - let mut roles = layout.roles.clone(); roles.merge(&layout.staging); @@ -323,11 +331,20 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { } pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { - if !layout.staging.items().is_empty() { + let has_changes = layout + .staging + .items() + .iter() + .any(|(k, _, v)| layout.roles.get(k) != Some(v)); + + if has_changes { println!(); println!("==== STAGED ROLE CHANGES ===="); let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; for (id, _, role) in layout.staging.items().iter() { + if layout.roles.get(id) == Some(role) { + continue; + } if let Some(role) = &role.0 { let tags = role.tags.join(","); table.push(format!( diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 7d496507..fe11ad44 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -208,7 +208,7 @@ pub fn find_matching_node( ) -> Result { let mut candidates = vec![]; for c in cand { - if hex::encode(&c).starts_with(&pattern) { + if hex::encode(&c).starts_with(&pattern) && !candidates.contains(&c) { candidates.push(c); } }