WIP add content defined chunking #42
2 changed files with 28 additions and 28 deletions
|
@ -52,14 +52,14 @@ pub async fn handle_put(
|
||||||
if first_block.len() < INLINE_THRESHOLD {
|
if first_block.len() < INLINE_THRESHOLD {
|
||||||
let mut md5sum = Md5::new();
|
let mut md5sum = Md5::new();
|
||||||
md5sum.update(&first_block[..]);
|
md5sum.update(&first_block[..]);
|
||||||
let md5sum_arr = md5sum.finalize();
|
let data_md5sum = md5sum.finalize();
|
||||||
let md5sum_hex = hex::encode(md5sum_arr);
|
let data_md5sum_hex = hex::encode(data_md5sum);
|
||||||
|
|
||||||
let sha256sum_hash = sha256sum(&first_block[..]);
|
let data_sha256sum = sha256sum(&first_block[..]);
|
||||||
|
|
||||||
ensure_checksum_matches(
|
ensure_checksum_matches(
|
||||||
md5sum_arr.as_slice(),
|
data_md5sum.as_slice(),
|
||||||
sha256sum_hash,
|
data_sha256sum,
|
||||||
content_md5.as_deref(),
|
content_md5.as_deref(),
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)?;
|
)?;
|
||||||
|
@ -71,7 +71,7 @@ pub async fn handle_put(
|
||||||
ObjectVersionMeta {
|
ObjectVersionMeta {
|
||||||
headers,
|
headers,
|
||||||
size: first_block.len() as u64,
|
size: first_block.len() as u64,
|
||||||
etag: md5sum_hex.clone(),
|
etag: data_md5sum_hex.clone(),
|
||||||
},
|
},
|
||||||
first_block,
|
first_block,
|
||||||
)),
|
)),
|
||||||
|
@ -80,7 +80,7 @@ pub async fn handle_put(
|
||||||
let object = Object::new(bucket.into(), key.into(), vec![object_version]);
|
let object = Object::new(bucket.into(), key.into(), vec![object_version]);
|
||||||
garage.object_table.insert(&object).await?;
|
garage.object_table.insert(&object).await?;
|
||||||
|
|
||||||
return Ok(put_response(version_uuid, md5sum_hex));
|
return Ok(put_response(version_uuid, data_md5sum_hex));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write version identifier in object table so that we have a trace
|
// Write version identifier in object table so that we have a trace
|
||||||
|
@ -95,7 +95,7 @@ pub async fn handle_put(
|
||||||
|
|
||||||
// Initialize corresponding entry in version table
|
// Initialize corresponding entry in version table
|
||||||
let version = Version::new(version_uuid, bucket.into(), key.into(), false);
|
let version = Version::new(version_uuid, bucket.into(), key.into(), false);
|
||||||
let first_block_hash = sha256sum(&first_block[..]);
|
let first_block_hash = blake2sum(&first_block[..]);
|
||||||
|
|
||||||
// Transfer data and verify checksum
|
// Transfer data and verify checksum
|
||||||
let tx_result = read_and_put_blocks(
|
let tx_result = read_and_put_blocks(
|
||||||
|
@ -107,14 +107,14 @@ pub async fn handle_put(
|
||||||
&mut chunker,
|
&mut chunker,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.and_then(|(total_size, md5sum_arr, sha256sum)| {
|
.and_then(|(total_size, data_md5sum, data_sha256sum)| {
|
||||||
ensure_checksum_matches(
|
ensure_checksum_matches(
|
||||||
md5sum_arr.as_slice(),
|
data_md5sum.as_slice(),
|
||||||
sha256sum,
|
data_sha256sum,
|
||||||
content_md5.as_deref(),
|
content_md5.as_deref(),
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)
|
)
|
||||||
.map(|()| (total_size, md5sum_arr))
|
.map(|()| (total_size, data_md5sum))
|
||||||
});
|
});
|
||||||
|
|
||||||
// If something went wrong, clean up
|
// If something went wrong, clean up
|
||||||
|
@ -148,13 +148,13 @@ pub async fn handle_put(
|
||||||
/// Validate MD5 sum against content-md5 header
|
/// Validate MD5 sum against content-md5 header
|
||||||
/// and sha256sum against signed content-sha256
|
/// and sha256sum against signed content-sha256
|
||||||
fn ensure_checksum_matches(
|
fn ensure_checksum_matches(
|
||||||
md5sum: &[u8],
|
data_md5sum: &[u8],
|
||||||
sha256sum: garage_util::data::FixedBytes32,
|
data_sha256sum: garage_util::data::FixedBytes32,
|
||||||
content_md5: Option<&str>,
|
content_md5: Option<&str>,
|
||||||
content_sha256: Option<garage_util::data::FixedBytes32>,
|
content_sha256: Option<garage_util::data::FixedBytes32>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
if let Some(expected_sha256) = content_sha256 {
|
if let Some(expected_sha256) = content_sha256 {
|
||||||
if expected_sha256 != sha256sum {
|
if expected_sha256 != data_sha256sum {
|
||||||
return Err(Error::BadRequest(format!(
|
return Err(Error::BadRequest(format!(
|
||||||
"Unable to validate x-amz-content-sha256"
|
"Unable to validate x-amz-content-sha256"
|
||||||
)));
|
)));
|
||||||
|
@ -163,7 +163,7 @@ fn ensure_checksum_matches(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(expected_md5) = content_md5 {
|
if let Some(expected_md5) = content_md5 {
|
||||||
if expected_md5.trim_matches('"') != base64::encode(md5sum) {
|
if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
|
||||||
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
|
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
|
||||||
} else {
|
} else {
|
||||||
trace!("Successfully validated content-md5");
|
trace!("Successfully validated content-md5");
|
||||||
|
@ -204,7 +204,7 @@ async fn read_and_put_blocks(
|
||||||
if let Some(block) = next_block {
|
if let Some(block) = next_block {
|
||||||
md5hasher.update(&block[..]);
|
md5hasher.update(&block[..]);
|
||||||
sha256hasher.input(&block[..]);
|
sha256hasher.input(&block[..]);
|
||||||
let block_hash = sha256sum(&block[..]);
|
let block_hash = blake2sum(&block[..]);
|
||||||
let block_len = block.len();
|
let block_len = block.len();
|
||||||
put_curr_version_block = put_block_meta(
|
put_curr_version_block = put_block_meta(
|
||||||
garage.clone(),
|
garage.clone(),
|
||||||
|
@ -222,14 +222,14 @@ async fn read_and_put_blocks(
|
||||||
}
|
}
|
||||||
|
|
||||||
let total_size = next_offset as u64;
|
let total_size = next_offset as u64;
|
||||||
let md5sum_arr = md5hasher.finalize();
|
let data_md5sum = md5hasher.finalize();
|
||||||
|
|
||||||
let sha256sum_arr = sha256hasher.result();
|
let data_sha256sum = sha256hasher.result();
|
||||||
let mut hash = [0u8; 32];
|
let mut hash = [0u8; 32];
|
||||||
hash.copy_from_slice(&sha256sum_arr[..]);
|
hash.copy_from_slice(&data_sha256sum[..]);
|
||||||
let sha256sum_arr = Hash::from(hash);
|
let data_sha256sum = Hash::from(hash);
|
||||||
|
|
||||||
Ok((total_size, md5sum_arr, sha256sum_arr))
|
Ok((total_size, data_md5sum, data_sha256sum))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn put_block_meta(
|
async fn put_block_meta(
|
||||||
|
@ -389,8 +389,8 @@ pub async fn handle_put_part(
|
||||||
|
|
||||||
// Copy block to store
|
// Copy block to store
|
||||||
let version = Version::new(version_uuid, bucket, key, false);
|
let version = Version::new(version_uuid, bucket, key, false);
|
||||||
let first_block_hash = sha256sum(&first_block[..]);
|
let first_block_hash = blake2sum(&first_block[..]);
|
||||||
let (_, md5sum_arr, sha256sum) = read_and_put_blocks(
|
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
|
||||||
&garage,
|
&garage,
|
||||||
version,
|
version,
|
||||||
part_number,
|
part_number,
|
||||||
|
@ -401,14 +401,14 @@ pub async fn handle_put_part(
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
ensure_checksum_matches(
|
ensure_checksum_matches(
|
||||||
md5sum_arr.as_slice(),
|
data_md5sum.as_slice(),
|
||||||
sha256sum,
|
data_sha256sum,
|
||||||
content_md5.as_deref(),
|
content_md5.as_deref(),
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let response = Response::builder()
|
let response = Response::builder()
|
||||||
.header("ETag", format!("\"{}\"", hex::encode(md5sum_arr)))
|
.header("ETag", format!("\"{}\"", hex::encode(data_md5sum)))
|
||||||
.body(Body::from(vec![]))
|
.body(Body::from(vec![]))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
Ok(response)
|
Ok(response)
|
||||||
|
|
|
@ -176,7 +176,7 @@ impl BlockManager {
|
||||||
f.read_to_end(&mut data).await?;
|
f.read_to_end(&mut data).await?;
|
||||||
drop(f);
|
drop(f);
|
||||||
|
|
||||||
if data::sha256sum(&data[..]) != *hash {
|
if data::blake2sum(&data[..]) != *hash {
|
||||||
let _lock = self.data_dir_lock.lock().await;
|
let _lock = self.data_dir_lock.lock().await;
|
||||||
warn!("Block {:?} is corrupted. Deleting and resyncing.", hash);
|
warn!("Block {:?} is corrupted. Deleting and resyncing.", hash);
|
||||||
fs::remove_file(path).await?;
|
fs::remove_file(path).await?;
|
||||||
|
|
Loading…
Reference in a new issue