Defragmentation in UploadPartCopy: first pass (not pretty but it compiles)
This commit is contained in:
parent
a4f9f19ac3
commit
e5341ca47b
1 changed files with 122 additions and 63 deletions
|
@ -1,7 +1,7 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use futures::TryFutureExt;
|
||||
use futures::{stream, StreamExt, TryFutureExt};
|
||||
use md5::{Digest as Md5Digest, Md5};
|
||||
|
||||
use hyper::{Body, Request, Response};
|
||||
|
@ -298,77 +298,136 @@ pub async fn handle_upload_part_copy(
|
|||
// Now, actually copy the blocks
|
||||
let mut md5hasher = Md5::new();
|
||||
|
||||
let mut block = Some(
|
||||
garage
|
||||
.block_manager
|
||||
.rpc_get_block(&blocks_to_copy[0].0)
|
||||
.await?,
|
||||
);
|
||||
// First, create a stream that is able to read the source blocks
|
||||
// and extract the subrange if necessary.
|
||||
// The second returned value is an Option<Hash>, that is Some
|
||||
// if and only if the block returned is a block that already existed
|
||||
// in the Garage data store (thus we don't need to save it again).
|
||||
let garage2 = garage.clone();
|
||||
let source_blocks = stream::iter(blocks_to_copy)
|
||||
.flat_map(|(block_hash, range_to_copy)| {
|
||||
let garage3 = garage2.clone();
|
||||
stream::once(async move {
|
||||
let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
|
||||
match range_to_copy {
|
||||
Some(r) => Ok((data[r].to_vec(), None)),
|
||||
None => Ok((data, Some(block_hash))),
|
||||
}
|
||||
})
|
||||
})
|
||||
.peekable();
|
||||
let mut source_blocks = Box::pin(source_blocks);
|
||||
|
||||
// Keep information about the next block we want to insert
|
||||
let mut current_offset = 0;
|
||||
for (i, (block_hash, range_to_copy)) in blocks_to_copy.iter().enumerate() {
|
||||
let (current_block, subrange_hash) = match range_to_copy.clone() {
|
||||
Some(r) => {
|
||||
let subrange = block.take().unwrap()[r].to_vec();
|
||||
let hash = blake2sum(&subrange);
|
||||
(subrange, hash)
|
||||
let mut next_block: Option<(Vec<u8>, Option<Hash>)> = None;
|
||||
|
||||
// State of the defragmenter
|
||||
let config_block_size = garage.config.block_size;
|
||||
let mut defrag_buffer = vec![];
|
||||
let mut defrag_hash = None;
|
||||
|
||||
// We loop a step that does two things concurrently:
|
||||
// 1. if there is a block to be inserted in next_block, insert it
|
||||
// 2. grab the next block (it might be composed of several sub-blocks
|
||||
// to be concatenated to ensure defragmentation)
|
||||
loop {
|
||||
let insert_current_block = async {
|
||||
if let Some((data, existing_block_hash)) = next_block {
|
||||
md5hasher.update(&data[..]);
|
||||
|
||||
let must_upload = existing_block_hash.is_none();
|
||||
let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
|
||||
|
||||
let mut version =
|
||||
Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
|
||||
version.blocks.put(
|
||||
VersionBlockKey {
|
||||
part_number,
|
||||
offset: current_offset,
|
||||
},
|
||||
VersionBlock {
|
||||
hash: final_hash,
|
||||
size: data.len() as u64,
|
||||
},
|
||||
);
|
||||
current_offset += data.len() as u64;
|
||||
|
||||
let block_ref = BlockRef {
|
||||
block: final_hash,
|
||||
version: dest_version_uuid,
|
||||
deleted: false.into(),
|
||||
};
|
||||
|
||||
let garage2 = garage.clone();
|
||||
futures::try_join!(
|
||||
// Thing 1: if the block is not exactly a block that existed before,
|
||||
// we need to insert that data as a new block.
|
||||
async move {
|
||||
if must_upload {
|
||||
garage2.block_manager.rpc_put_block(final_hash, data).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
},
|
||||
// Thing 2: we need to insert the block in the version
|
||||
garage.version_table.insert(&version),
|
||||
// Thing 3: we need to add a block reference
|
||||
garage.block_ref_table.insert(&block_ref),
|
||||
)?;
|
||||
Ok(())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
None => (block.take().unwrap(), *block_hash),
|
||||
};
|
||||
md5hasher.update(¤t_block[..]);
|
||||
|
||||
let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
|
||||
version.blocks.put(
|
||||
VersionBlockKey {
|
||||
part_number,
|
||||
offset: current_offset,
|
||||
},
|
||||
VersionBlock {
|
||||
hash: subrange_hash,
|
||||
size: current_block.len() as u64,
|
||||
},
|
||||
);
|
||||
current_offset += current_block.len() as u64;
|
||||
|
||||
let block_ref = BlockRef {
|
||||
block: subrange_hash,
|
||||
version: dest_version_uuid,
|
||||
deleted: false.into(),
|
||||
};
|
||||
|
||||
let next_block_hash = blocks_to_copy.get(i + 1).map(|(h, _)| *h);
|
||||
let get_next_block = async {
|
||||
loop {
|
||||
let tmpres: Option<&Result<(Vec<u8>, Option<Hash>), garage_util::error::Error>> =
|
||||
source_blocks.as_mut().peek().await;
|
||||
if let Some(res) = tmpres {
|
||||
let (next_block, _) = match res {
|
||||
Ok(t) => t,
|
||||
Err(_) => {
|
||||
source_blocks.next().await.unwrap()?;
|
||||
unreachable!()
|
||||
}
|
||||
};
|
||||
|
||||
let garage2 = garage.clone();
|
||||
let garage3 = garage.clone();
|
||||
let is_subrange = range_to_copy.is_some();
|
||||
|
||||
let (_, _, _, next_block) = futures::try_join!(
|
||||
// Thing 1: if we are taking a subrange of the source block,
|
||||
// we need to insert that subrange as a new block.
|
||||
async move {
|
||||
if is_subrange {
|
||||
garage2
|
||||
.block_manager
|
||||
.rpc_put_block(subrange_hash, current_block)
|
||||
.await
|
||||
if defrag_buffer.is_empty() {
|
||||
let (next_block, next_block_hash) = source_blocks.next().await.unwrap()?;
|
||||
defrag_buffer = next_block;
|
||||
defrag_hash = next_block_hash;
|
||||
} else if defrag_buffer.len() + next_block.len() > config_block_size {
|
||||
return Ok((
|
||||
std::mem::replace(&mut defrag_buffer, vec![]),
|
||||
std::mem::replace(&mut defrag_hash, None),
|
||||
));
|
||||
} else {
|
||||
let (next_block, _) = source_blocks.next().await.unwrap()?;
|
||||
defrag_buffer.extend(next_block);
|
||||
defrag_hash = None;
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
return Ok::<_, garage_util::error::Error>((
|
||||
std::mem::replace(&mut defrag_buffer, vec![]),
|
||||
std::mem::replace(&mut defrag_hash, None),
|
||||
));
|
||||
}
|
||||
},
|
||||
// Thing 2: we need to insert the block in the version
|
||||
garage.version_table.insert(&version),
|
||||
// Thing 3: we need to add a block reference
|
||||
garage.block_ref_table.insert(&block_ref),
|
||||
// Thing 4: we need to prefetch the next block
|
||||
async move {
|
||||
match next_block_hash {
|
||||
Some(h) => Ok(Some(garage3.block_manager.rpc_get_block(&h).await?)),
|
||||
None => Ok(None),
|
||||
}
|
||||
},
|
||||
)?;
|
||||
}
|
||||
};
|
||||
|
||||
block = next_block;
|
||||
let tmp: Result<(_, (Vec<u8>, Option<Hash>)), garage_util::error::Error> = futures::try_join!(
|
||||
insert_current_block,
|
||||
// Thing 4: we need to prefetch the next block
|
||||
get_next_block,
|
||||
);
|
||||
let tmp_block = tmp?.1;
|
||||
|
||||
if tmp_block.0.is_empty() {
|
||||
break;
|
||||
}
|
||||
next_block = Some(tmp_block);
|
||||
}
|
||||
|
||||
let data_md5sum = md5hasher.finalize();
|
||||
|
|
Loading…
Reference in a new issue