Prettier code for defragmentation
This commit is contained in:
parent
e5341ca47b
commit
c93008d333
1 changed files with 105 additions and 107 deletions
|
@ -1,7 +1,8 @@
|
||||||
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
use futures::{stream, StreamExt, TryFutureExt};
|
use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
|
||||||
use md5::{Digest as Md5Digest, Md5};
|
use md5::{Digest as Md5Digest, Md5};
|
||||||
|
|
||||||
use hyper::{Body, Request, Response};
|
use hyper::{Body, Request, Response};
|
||||||
|
@ -316,118 +317,67 @@ pub async fn handle_upload_part_copy(
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.peekable();
|
.peekable();
|
||||||
let mut source_blocks = Box::pin(source_blocks);
|
|
||||||
|
|
||||||
// Keep information about the next block we want to insert
|
// The defragmenter is a custom stream (defined below) that concatenates
|
||||||
|
// consecutive block parts when they are too small.
|
||||||
|
// It returns a series of (Vec<u8>, Option<Hash>).
|
||||||
|
// When it is done, it returns an empty vec.
|
||||||
|
// Same as the previous iterator, the Option is Some(_) if and only if
|
||||||
|
// it's an existing block of the Garage data store.
|
||||||
|
let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks));
|
||||||
|
|
||||||
let mut current_offset = 0;
|
let mut current_offset = 0;
|
||||||
let mut next_block: Option<(Vec<u8>, Option<Hash>)> = None;
|
let mut next_block = defragmenter.next().await?;
|
||||||
|
|
||||||
// State of the defragmenter
|
|
||||||
let config_block_size = garage.config.block_size;
|
|
||||||
let mut defrag_buffer = vec![];
|
|
||||||
let mut defrag_hash = None;
|
|
||||||
|
|
||||||
// We loop a step that does two things concurrently:
|
|
||||||
// 1. if there is a block to be inserted in next_block, insert it
|
|
||||||
// 2. grab the next block (it might be composed of several sub-blocks
|
|
||||||
// to be concatenated to ensure defragmentation)
|
|
||||||
loop {
|
loop {
|
||||||
let insert_current_block = async {
|
let (data, existing_block_hash) = next_block;
|
||||||
if let Some((data, existing_block_hash)) = next_block {
|
if data.is_empty() {
|
||||||
md5hasher.update(&data[..]);
|
|
||||||
|
|
||||||
let must_upload = existing_block_hash.is_none();
|
|
||||||
let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
|
|
||||||
|
|
||||||
let mut version =
|
|
||||||
Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
|
|
||||||
version.blocks.put(
|
|
||||||
VersionBlockKey {
|
|
||||||
part_number,
|
|
||||||
offset: current_offset,
|
|
||||||
},
|
|
||||||
VersionBlock {
|
|
||||||
hash: final_hash,
|
|
||||||
size: data.len() as u64,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
current_offset += data.len() as u64;
|
|
||||||
|
|
||||||
let block_ref = BlockRef {
|
|
||||||
block: final_hash,
|
|
||||||
version: dest_version_uuid,
|
|
||||||
deleted: false.into(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let garage2 = garage.clone();
|
|
||||||
futures::try_join!(
|
|
||||||
// Thing 1: if the block is not exactly a block that existed before,
|
|
||||||
// we need to insert that data as a new block.
|
|
||||||
async move {
|
|
||||||
if must_upload {
|
|
||||||
garage2.block_manager.rpc_put_block(final_hash, data).await
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
},
|
|
||||||
// Thing 2: we need to insert the block in the version
|
|
||||||
garage.version_table.insert(&version),
|
|
||||||
// Thing 3: we need to add a block reference
|
|
||||||
garage.block_ref_table.insert(&block_ref),
|
|
||||||
)?;
|
|
||||||
Ok(())
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let get_next_block = async {
|
|
||||||
loop {
|
|
||||||
let tmpres: Option<&Result<(Vec<u8>, Option<Hash>), garage_util::error::Error>> =
|
|
||||||
source_blocks.as_mut().peek().await;
|
|
||||||
if let Some(res) = tmpres {
|
|
||||||
let (next_block, _) = match res {
|
|
||||||
Ok(t) => t,
|
|
||||||
Err(_) => {
|
|
||||||
source_blocks.next().await.unwrap()?;
|
|
||||||
unreachable!()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if defrag_buffer.is_empty() {
|
|
||||||
let (next_block, next_block_hash) = source_blocks.next().await.unwrap()?;
|
|
||||||
defrag_buffer = next_block;
|
|
||||||
defrag_hash = next_block_hash;
|
|
||||||
} else if defrag_buffer.len() + next_block.len() > config_block_size {
|
|
||||||
return Ok((
|
|
||||||
std::mem::replace(&mut defrag_buffer, vec![]),
|
|
||||||
std::mem::replace(&mut defrag_hash, None),
|
|
||||||
));
|
|
||||||
} else {
|
|
||||||
let (next_block, _) = source_blocks.next().await.unwrap()?;
|
|
||||||
defrag_buffer.extend(next_block);
|
|
||||||
defrag_hash = None;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Ok::<_, garage_util::error::Error>((
|
|
||||||
std::mem::replace(&mut defrag_buffer, vec![]),
|
|
||||||
std::mem::replace(&mut defrag_hash, None),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let tmp: Result<(_, (Vec<u8>, Option<Hash>)), garage_util::error::Error> = futures::try_join!(
|
|
||||||
insert_current_block,
|
|
||||||
// Thing 4: we need to prefetch the next block
|
|
||||||
get_next_block,
|
|
||||||
);
|
|
||||||
let tmp_block = tmp?.1;
|
|
||||||
|
|
||||||
if tmp_block.0.is_empty() {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
next_block = Some(tmp_block);
|
|
||||||
|
md5hasher.update(&data[..]);
|
||||||
|
|
||||||
|
let must_upload = existing_block_hash.is_none();
|
||||||
|
let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
|
||||||
|
|
||||||
|
let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
|
||||||
|
version.blocks.put(
|
||||||
|
VersionBlockKey {
|
||||||
|
part_number,
|
||||||
|
offset: current_offset,
|
||||||
|
},
|
||||||
|
VersionBlock {
|
||||||
|
hash: final_hash,
|
||||||
|
size: data.len() as u64,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
current_offset += data.len() as u64;
|
||||||
|
|
||||||
|
let block_ref = BlockRef {
|
||||||
|
block: final_hash,
|
||||||
|
version: dest_version_uuid,
|
||||||
|
deleted: false.into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let garage2 = garage.clone();
|
||||||
|
let res = futures::try_join!(
|
||||||
|
// Thing 1: if the block is not exactly a block that existed before,
|
||||||
|
// we need to insert that data as a new block.
|
||||||
|
async move {
|
||||||
|
if must_upload {
|
||||||
|
garage2.block_manager.rpc_put_block(final_hash, data).await
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
},
|
||||||
|
// Thing 2: we need to insert the block in the version
|
||||||
|
garage.version_table.insert(&version),
|
||||||
|
// Thing 3: we need to add a block reference
|
||||||
|
garage.block_ref_table.insert(&block_ref),
|
||||||
|
// Thing 4: we need to prefetch the next block
|
||||||
|
defragmenter.next(),
|
||||||
|
)?;
|
||||||
|
next_block = res.3;
|
||||||
}
|
}
|
||||||
|
|
||||||
let data_md5sum = md5hasher.finalize();
|
let data_md5sum = md5hasher.finalize();
|
||||||
|
@ -600,6 +550,54 @@ impl CopyPreconditionHeaders {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type BlockStreamItemOk = (Vec<u8>, Option<Hash>);
|
||||||
|
type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
|
||||||
|
|
||||||
|
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
|
||||||
|
block_size: usize,
|
||||||
|
block_stream: Pin<Box<stream::Peekable<S>>>,
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
hash: Option<Hash>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
|
||||||
|
fn new(block_size: usize, block_stream: Pin<Box<stream::Peekable<S>>>) -> Self {
|
||||||
|
Self {
|
||||||
|
block_size,
|
||||||
|
block_stream,
|
||||||
|
buffer: vec![],
|
||||||
|
hash: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn next(&mut self) -> BlockStreamItem {
|
||||||
|
// Fill buffer while we can
|
||||||
|
while let Some(res) = self.block_stream.as_mut().peek().await {
|
||||||
|
let (peeked_next_block, _) = match res {
|
||||||
|
Ok(t) => t,
|
||||||
|
Err(_) => {
|
||||||
|
self.block_stream.next().await.unwrap()?;
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if self.buffer.is_empty() {
|
||||||
|
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
|
||||||
|
self.buffer = next_block;
|
||||||
|
self.hash = next_block_hash;
|
||||||
|
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
let (next_block, _) = self.block_stream.next().await.unwrap()?;
|
||||||
|
self.buffer.extend(next_block);
|
||||||
|
self.hash = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok((std::mem::take(&mut self.buffer), self.hash.take()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, PartialEq)]
|
#[derive(Debug, Serialize, PartialEq)]
|
||||||
pub struct CopyObjectResult {
|
pub struct CopyObjectResult {
|
||||||
#[serde(rename = "LastModified")]
|
#[serde(rename = "LastModified")]
|
||||||
|
|
Loading…
Reference in a new issue