Small fixes and safety check
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing

This commit is contained in:
Alex 2022-01-12 08:54:18 +01:00
parent e9ba8f5e02
commit 84bfc19399
No known key found for this signature in database
GPG key ID: EDABF9711E244EB1
3 changed files with 55 additions and 12 deletions

View file

@ -222,8 +222,8 @@ pub async fn handle_upload_part_copy(
return Err(Error::NoSuchUpload);
}
// Get chunk list of source object
let source_version = match source_version_data {
// Check source version is not inlined
match source_version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, _bytes) => {
// This is only for small files, we don't bother handling this.
@ -233,13 +233,29 @@ pub async fn handle_upload_part_copy(
"Source object is too small (minimum part size is 5Mb)".into(),
));
}
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => garage
.version_table
.get(&source_object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?,
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (),
};
// Fetch source versin with its block list,
// and destination version to check part hasn't yet been uploaded
let (source_version, dest_version) = futures::try_join!(
garage
.version_table
.get(&source_object_version.uuid, &EmptyKey),
garage.version_table.get(&dest_version_uuid, &EmptyKey),
)?;
let source_version = source_version.ok_or(Error::NoSuchKey)?;
// Check this part number hasn't yet been uploaded
if let Some(dv) = dest_version {
if dv.has_part_number(part_number) {
return Err(Error::BadRequest(format!(
"Part number {} has already been uploaded",
part_number
)));
}
}
// We want to reuse blocks from the source version as much as possible.
// However, we still need to get the data from these blocks
// because we need to know it to calculate the MD5sum of the part
@ -275,8 +291,8 @@ pub async fn handle_upload_part_copy(
};
size_to_copy += range_to_copy
.as_ref()
.map(|x| x.len())
.unwrap_or(block.size as usize);
.map(|x| x.len() as u64)
.unwrap_or(block.size);
blocks_to_copy.push((block.hash, range_to_copy));
}
@ -305,7 +321,7 @@ pub async fn handle_upload_part_copy(
for (i, (block_hash, range_to_copy)) in blocks_to_copy.iter().enumerate() {
let (current_block, subrange_hash) = match range_to_copy.clone() {
Some(r) => {
let subrange = block.take().unwrap().drain(r).collect::<Vec<u8>>();
let subrange = block.take().unwrap()[r].to_vec();
let hash = blake2sum(&subrange);
(subrange, hash)
}

View file

@ -370,8 +370,11 @@ pub async fn handle_put_part(
let key = key.to_string();
let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
let (object, first_block) =
futures::try_join!(garage.object_table.get(&bucket_id, &key), chunker.next())?;
let (object, version, first_block) = futures::try_join!(
garage.object_table.get(&bucket_id, &key),
garage.version_table.get(&version_uuid, &EmptyKey),
chunker.next()
)?;
// Check object is valid and multipart block can be accepted
let first_block = first_block.ok_or_bad_request("Empty body")?;
@ -385,6 +388,16 @@ pub async fn handle_put_part(
return Err(Error::NoSuchUpload);
}
// Check part hasn't already been uploaded
if let Some(v) = version {
if v.has_part_number(part_number) {
return Err(Error::BadRequest(format!(
"Part number {} has already been uploaded",
part_number
)));
}
}
// Copy block to store
let version = Version::new(version_uuid, bucket_id, key, false);
let first_block_hash = blake2sum(&first_block[..]);

View file

@ -47,6 +47,20 @@ impl Version {
key,
}
}
pub fn has_part_number(&self, part_number: u64) -> bool {
let case1 = self
.parts_etags
.items()
.binary_search_by(|(k, _)| k.cmp(&part_number))
.is_ok();
let case2 = self
.blocks
.items()
.binary_search_by(|(k, _)| k.part_number.cmp(&part_number))
.is_ok();
case1 || case2
}
}
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]