K2V #293
3 changed files with 121 additions and 59 deletions
|
@ -1,7 +1,8 @@
|
|||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use futures::TryFutureExt;
|
||||
use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
|
||||
use md5::{Digest as Md5Digest, Md5};
|
||||
|
||||
use hyper::{Body, Request, Response};
|
||||
|
@ -268,7 +269,6 @@ pub async fn handle_upload_part_copy(
|
|||
|
||||
let mut blocks_to_copy = vec![];
|
||||
let mut current_offset = 0;
|
||||
let mut size_to_copy = 0;
|
||||
for (_bk, block) in source_version.blocks.items().iter() {
|
||||
let (block_begin, block_end) = (current_offset, current_offset + block.size);
|
||||
|
||||
|
@ -289,10 +289,6 @@ pub async fn handle_upload_part_copy(
|
|||
(Some(b), None) => Some(b as usize..block.size as usize),
|
||||
(None, None) => None,
|
||||
};
|
||||
size_to_copy += range_to_copy
|
||||
.as_ref()
|
||||
.map(|x| x.len() as u64)
|
||||
.unwrap_or(block.size);
|
||||
|
||||
blocks_to_copy.push((block.hash, range_to_copy));
|
||||
}
|
||||
|
@ -300,34 +296,49 @@ pub async fn handle_upload_part_copy(
|
|||
current_offset = block_end;
|
||||
}
|
||||
|
||||
if size_to_copy < 1024 * 1024 {
|
||||
return Err(Error::BadRequest(format!(
|
||||
"Not enough data to copy: {} bytes (minimum: 1MB)",
|
||||
size_to_copy
|
||||
)));
|
||||
}
|
||||
|
||||
// Now, actually copy the blocks
|
||||
let mut md5hasher = Md5::new();
|
||||
|
||||
let mut block = Some(
|
||||
garage
|
||||
.block_manager
|
||||
.rpc_get_block(&blocks_to_copy[0].0)
|
||||
.await?,
|
||||
);
|
||||
// First, create a stream that is able to read the source blocks
|
||||
// and extract the subrange if necessary.
|
||||
// The second returned value is an Option<Hash>, that is Some
|
||||
// if and only if the block returned is a block that already existed
|
||||
// in the Garage data store (thus we don't need to save it again).
|
||||
let garage2 = garage.clone();
|
||||
let source_blocks = stream::iter(blocks_to_copy)
|
||||
.flat_map(|(block_hash, range_to_copy)| {
|
||||
let garage3 = garage2.clone();
|
||||
stream::once(async move {
|
||||
let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
|
||||
match range_to_copy {
|
||||
Some(r) => Ok((data[r].to_vec(), None)),
|
||||
None => Ok((data, Some(block_hash))),
|
||||
}
|
||||
})
|
||||
})
|
||||
.peekable();
|
||||
|
||||
// The defragmenter is a custom stream (defined below) that concatenates
|
||||
// consecutive block parts when they are too small.
|
||||
// It returns a series of (Vec<u8>, Option<Hash>).
|
||||
// When it is done, it returns an empty vec.
|
||||
// Same as the previous iterator, the Option is Some(_) if and only if
|
||||
// it's an existing block of the Garage data store.
|
||||
let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks));
|
||||
|
||||
let mut current_offset = 0;
|
||||
for (i, (block_hash, range_to_copy)) in blocks_to_copy.iter().enumerate() {
|
||||
let (current_block, subrange_hash) = match range_to_copy.clone() {
|
||||
Some(r) => {
|
||||
let subrange = block.take().unwrap()[r].to_vec();
|
||||
let hash = blake2sum(&subrange);
|
||||
(subrange, hash)
|
||||
let mut next_block = defragmenter.next().await?;
|
||||
|
||||
loop {
|
||||
let (data, existing_block_hash) = next_block;
|
||||
if data.is_empty() {
|
||||
break;
|
||||
}
|
||||
None => (block.take().unwrap(), *block_hash),
|
||||
};
|
||||
md5hasher.update(¤t_block[..]);
|
||||
|
||||
md5hasher.update(&data[..]);
|
||||
|
||||
let must_upload = existing_block_hash.is_none();
|
||||
let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
|
||||
|
||||
let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false);
|
||||
version.blocks.put(
|
||||
|
@ -336,33 +347,25 @@ pub async fn handle_upload_part_copy(
|
|||
offset: current_offset,
|
||||
},
|
||||
VersionBlock {
|
||||
hash: subrange_hash,
|
||||
size: current_block.len() as u64,
|
||||
hash: final_hash,
|
||||
size: data.len() as u64,
|
||||
},
|
||||
);
|
||||
current_offset += current_block.len() as u64;
|
||||
current_offset += data.len() as u64;
|
||||
|
||||
let block_ref = BlockRef {
|
||||
block: subrange_hash,
|
||||
block: final_hash,
|
||||
version: dest_version_uuid,
|
||||
deleted: false.into(),
|
||||
};
|
||||
|
||||
let next_block_hash = blocks_to_copy.get(i + 1).map(|(h, _)| *h);
|
||||
|
||||
let garage2 = garage.clone();
|
||||
let garage3 = garage.clone();
|
||||
let is_subrange = range_to_copy.is_some();
|
||||
|
||||
let (_, _, _, next_block) = futures::try_join!(
|
||||
// Thing 1: if we are taking a subrange of the source block,
|
||||
// we need to insert that subrange as a new block.
|
||||
let res = futures::try_join!(
|
||||
// Thing 1: if the block is not exactly a block that existed before,
|
||||
// we need to insert that data as a new block.
|
||||
async move {
|
||||
if is_subrange {
|
||||
garage2
|
||||
.block_manager
|
||||
.rpc_put_block(subrange_hash, current_block)
|
||||
.await
|
||||
if must_upload {
|
||||
garage2.block_manager.rpc_put_block(final_hash, data).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
|
@ -372,15 +375,9 @@ pub async fn handle_upload_part_copy(
|
|||
// Thing 3: we need to add a block reference
|
||||
garage.block_ref_table.insert(&block_ref),
|
||||
// Thing 4: we need to prefetch the next block
|
||||
async move {
|
||||
match next_block_hash {
|
||||
Some(h) => Ok(Some(garage3.block_manager.rpc_get_block(&h).await?)),
|
||||
None => Ok(None),
|
||||
}
|
||||
},
|
||||
defragmenter.next(),
|
||||
)?;
|
||||
|
||||
block = next_block;
|
||||
next_block = res.3;
|
||||
}
|
||||
|
||||
let data_md5sum = md5hasher.finalize();
|
||||
|
@ -553,6 +550,54 @@ impl CopyPreconditionHeaders {
|
|||
}
|
||||
}
|
||||
|
||||
type BlockStreamItemOk = (Vec<u8>, Option<Hash>);
|
||||
type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
|
||||
|
||||
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
|
||||
block_size: usize,
|
||||
block_stream: Pin<Box<stream::Peekable<S>>>,
|
||||
buffer: Vec<u8>,
|
||||
hash: Option<Hash>,
|
||||
}
|
||||
|
||||
impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
|
||||
fn new(block_size: usize, block_stream: Pin<Box<stream::Peekable<S>>>) -> Self {
|
||||
Self {
|
||||
block_size,
|
||||
block_stream,
|
||||
buffer: vec![],
|
||||
hash: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn next(&mut self) -> BlockStreamItem {
|
||||
// Fill buffer while we can
|
||||
while let Some(res) = self.block_stream.as_mut().peek().await {
|
||||
let (peeked_next_block, _) = match res {
|
||||
Ok(t) => t,
|
||||
Err(_) => {
|
||||
self.block_stream.next().await.unwrap()?;
|
||||
unreachable!()
|
||||
}
|
||||
};
|
||||
|
||||
if self.buffer.is_empty() {
|
||||
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
|
||||
self.buffer = next_block;
|
||||
self.hash = next_block_hash;
|
||||
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
|
||||
break;
|
||||
} else {
|
||||
let (next_block, _) = self.block_stream.next().await.unwrap()?;
|
||||
self.buffer.extend(next_block);
|
||||
self.hash = None;
|
||||
}
|
||||
}
|
||||
|
||||
Ok((std::mem::take(&mut self.buffer), self.hash.take()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, PartialEq)]
|
||||
pub struct CopyObjectResult {
|
||||
#[serde(rename = "LastModified")]
|
||||
|
|
|
@ -43,14 +43,22 @@ pub async fn cmd_assign_role(
|
|||
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
|
||||
};
|
||||
|
||||
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
|
||||
|
||||
let added_nodes = args
|
||||
.node_ids
|
||||
.iter()
|
||||
.map(|node_id| find_matching_node(status.iter().map(|adv| adv.id), node_id))
|
||||
.map(|node_id| {
|
||||
find_matching_node(
|
||||
status
|
||||
.iter()
|
||||
.map(|adv| adv.id)
|
||||
.chain(layout.node_ids().iter().cloned()),
|
||||
node_id,
|
||||
)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
|
||||
|
||||
let mut roles = layout.roles.clone();
|
||||
roles.merge(&layout.staging);
|
||||
|
||||
|
@ -323,11 +331,20 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
|
|||
}
|
||||
|
||||
pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
|
||||
if !layout.staging.items().is_empty() {
|
||||
let has_changes = layout
|
||||
.staging
|
||||
.items()
|
||||
.iter()
|
||||
.any(|(k, _, v)| layout.roles.get(k) != Some(v));
|
||||
|
||||
if has_changes {
|
||||
println!();
|
||||
println!("==== STAGED ROLE CHANGES ====");
|
||||
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
|
||||
for (id, _, role) in layout.staging.items().iter() {
|
||||
if layout.roles.get(id) == Some(role) {
|
||||
continue;
|
||||
}
|
||||
if let Some(role) = &role.0 {
|
||||
let tags = role.tags.join(",");
|
||||
table.push(format!(
|
||||
|
|
|
@ -208,7 +208,7 @@ pub fn find_matching_node(
|
|||
) -> Result<Uuid, Error> {
|
||||
let mut candidates = vec![];
|
||||
for c in cand {
|
||||
if hex::encode(&c).starts_with(&pattern) {
|
||||
if hex::encode(&c).starts_with(&pattern) && !candidates.contains(&c) {
|
||||
candidates.push(c);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue