Compare commits

...

3 commits

Author SHA1 Message Date
70fc58bd37
fix clippy
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
2022-04-07 23:02:00 +02:00
dbd3082fc6
Prettier code for defragmentation
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing
2022-04-07 19:01:04 +02:00
cc404822e7
Defragmentation in UploadPartCopy: first pass (not pretty but it compiles) 2022-04-07 18:38:32 +02:00

View file

@ -1,7 +1,8 @@
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::TryFutureExt; use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
use md5::{Digest as Md5Digest, Md5}; use md5::{Digest as Md5Digest, Md5};
use hyper::{Body, Request, Response}; use hyper::{Body, Request, Response};
@ -310,24 +311,46 @@ pub async fn handle_upload_part_copy(
// Now, actually copy the blocks // Now, actually copy the blocks
let mut md5hasher = Md5::new(); let mut md5hasher = Md5::new();
let mut block = Some( // First, create a stream that is able to read the source blocks
garage // and extract the subrange if necessary.
.block_manager // The second returned value is an Option<Hash>, that is Some
.rpc_get_block(&blocks_to_copy[0].0) // if and only if the block returned is a block that already existed
.await?, // in the Garage data store (thus we don't need to save it again).
); let garage2 = garage.clone();
let source_blocks = stream::iter(blocks_to_copy)
.flat_map(|(block_hash, range_to_copy)| {
let garage3 = garage2.clone();
stream::once(async move {
let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
match range_to_copy {
Some(r) => Ok((data[r].to_vec(), None)),
None => Ok((data, Some(block_hash))),
}
})
})
.peekable();
// The defragmenter is a custom stream (defined below) that concatenates
// consecutive block parts when they are too small.
// It returns a series of (Vec<u8>, Option<Hash>).
// When it is done, it returns an empty vec.
// Same as the previous iterator, the Option is Some(_) if and only if
// it's an existing block of the Garage data store.
let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks));
let mut current_offset = 0; let mut current_offset = 0;
for (i, (block_hash, range_to_copy)) in blocks_to_copy.iter().enumerate() { let mut next_block = defragmenter.next().await?;
let (current_block, subrange_hash) = match range_to_copy.clone() {
Some(r) => { loop {
let subrange = block.take().unwrap()[r].to_vec(); let (data, existing_block_hash) = next_block;
let hash = blake2sum(&subrange); if data.is_empty() {
(subrange, hash) break;
} }
None => (block.take().unwrap(), *block_hash),
}; md5hasher.update(&data[..]);
md5hasher.update(&current_block[..]);
let must_upload = existing_block_hash.is_none();
let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
version.blocks.put( version.blocks.put(
@ -336,33 +359,25 @@ pub async fn handle_upload_part_copy(
offset: current_offset, offset: current_offset,
}, },
VersionBlock { VersionBlock {
hash: subrange_hash, hash: final_hash,
size: current_block.len() as u64, size: data.len() as u64,
}, },
); );
current_offset += current_block.len() as u64; current_offset += data.len() as u64;
let block_ref = BlockRef { let block_ref = BlockRef {
block: subrange_hash, block: final_hash,
version: dest_version_uuid, version: dest_version_uuid,
deleted: false.into(), deleted: false.into(),
}; };
let next_block_hash = blocks_to_copy.get(i + 1).map(|(h, _)| *h);
let garage2 = garage.clone(); let garage2 = garage.clone();
let garage3 = garage.clone(); let res = futures::try_join!(
let is_subrange = range_to_copy.is_some(); // Thing 1: if the block is not exactly a block that existed before,
// we need to insert that data as a new block.
let (_, _, _, next_block) = futures::try_join!(
// Thing 1: if we are taking a subrange of the source block,
// we need to insert that subrange as a new block.
async move { async move {
if is_subrange { if must_upload {
garage2 garage2.block_manager.rpc_put_block(final_hash, data).await
.block_manager
.rpc_put_block(subrange_hash, current_block)
.await
} else { } else {
Ok(()) Ok(())
} }
@ -372,15 +387,9 @@ pub async fn handle_upload_part_copy(
// Thing 3: we need to add a block reference // Thing 3: we need to add a block reference
garage.block_ref_table.insert(&block_ref), garage.block_ref_table.insert(&block_ref),
// Thing 4: we need to prefetch the next block // Thing 4: we need to prefetch the next block
async move { defragmenter.next(),
match next_block_hash {
Some(h) => Ok(Some(garage3.block_manager.rpc_get_block(&h).await?)),
None => Ok(None),
}
},
)?; )?;
next_block = res.3;
block = next_block;
} }
let data_md5sum = md5hasher.finalize(); let data_md5sum = md5hasher.finalize();
@ -553,6 +562,54 @@ impl CopyPreconditionHeaders {
} }
} }
type BlockStreamItemOk = (Vec<u8>, Option<Hash>);
type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
block_size: usize,
block_stream: Pin<Box<stream::Peekable<S>>>,
buffer: Vec<u8>,
hash: Option<Hash>,
}
impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
fn new(block_size: usize, block_stream: Pin<Box<stream::Peekable<S>>>) -> Self {
Self {
block_size,
block_stream,
buffer: vec![],
hash: None,
}
}
async fn next(&mut self) -> BlockStreamItem {
// Fill buffer while we can
while let Some(res) = self.block_stream.as_mut().peek().await {
let (peeked_next_block, _) = match res {
Ok(t) => t,
Err(_) => {
self.block_stream.next().await.unwrap()?;
unreachable!()
}
};
if self.buffer.is_empty() {
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
self.buffer = next_block;
self.hash = next_block_hash;
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
break;
} else {
let (next_block, _) = self.block_stream.next().await.unwrap()?;
self.buffer.extend(next_block);
self.hash = None;
}
}
Ok((std::mem::take(&mut self.buffer), self.hash.take()))
}
}
#[derive(Debug, Serialize, PartialEq)] #[derive(Debug, Serialize, PartialEq)]
pub struct CopyObjectResult { pub struct CopyObjectResult {
#[serde(rename = "LastModified")] #[serde(rename = "LastModified")]