WIP add content defined chunking #42
5 changed files with 79 additions and 58 deletions
|
@ -41,45 +41,64 @@ pub async fn handle_copy(
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_uuid = gen_uuid();
|
let new_uuid = gen_uuid();
|
||||||
|
let new_timestamp = now_msec();
|
||||||
let dest_object_version = ObjectVersion {
|
let dest_object_version = ObjectVersion {
|
||||||
uuid: new_uuid,
|
uuid: new_uuid,
|
||||||
timestamp: now_msec(),
|
timestamp: new_timestamp,
|
||||||
state: ObjectVersionState::Complete(source_last_state.clone()),
|
state: ObjectVersionState::Complete(source_last_state.clone()),
|
||||||
};
|
};
|
||||||
|
|
||||||
match &source_last_state {
|
|
||||||
ObjectVersionData::DeleteMarker => {
|
|
||||||
return Err(Error::NotFound);
|
|
||||||
}
|
|
||||||
ObjectVersionData::Inline(_meta, _bytes) => {
|
|
||||||
let dest_object = Object::new(
|
let dest_object = Object::new(
|
||||||
dest_bucket.to_string(),
|
dest_bucket.to_string(),
|
||||||
dest_key.to_string(),
|
dest_key.to_string(),
|
||||||
vec![dest_object_version],
|
vec![dest_object_version],
|
||||||
);
|
);
|
||||||
|
|
||||||
|
match source_last_state {
|
||||||
|
ObjectVersionData::DeleteMarker => {
|
||||||
|
return Err(Error::NotFound);
|
||||||
|
}
|
||||||
|
ObjectVersionData::Inline(_meta, _bytes) => {
|
||||||
garage.object_table.insert(&dest_object).await?;
|
garage.object_table.insert(&dest_object).await?;
|
||||||
}
|
}
|
||||||
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
|
ObjectVersionData::FirstBlock(meta, _first_block_hash) => {
|
||||||
|
// Get block list from source version
|
||||||
let source_version = garage
|
let source_version = garage
|
||||||
.version_table
|
.version_table
|
||||||
.get(&source_last_v.uuid, &EmptyKey)
|
.get(&source_last_v.uuid, &EmptyKey)
|
||||||
.await?;
|
.await?;
|
||||||
let source_version = source_version.ok_or(Error::NotFound)?;
|
let source_version = source_version.ok_or(Error::NotFound)?;
|
||||||
|
|
||||||
|
// Write an "uploading" marker in Object table
|
||||||
|
// This holds a reference to the object in the Version table
|
||||||
|
// so that it won't be deleted, e.g. by repair_versions.
|
||||||
|
let tmp_dest_object_version = ObjectVersion {
|
||||||
|
uuid: new_uuid,
|
||||||
|
timestamp: new_timestamp,
|
||||||
|
state: ObjectVersionState::Uploading(meta.headers.clone()),
|
||||||
|
};
|
||||||
|
let tmp_dest_object = Object::new(
|
||||||
|
dest_bucket.to_string(),
|
||||||
|
dest_key.to_string(),
|
||||||
|
vec![tmp_dest_object_version],
|
||||||
|
);
|
||||||
|
garage.object_table.insert(&tmp_dest_object).await?;
|
||||||
|
|
||||||
|
// Write version in the version table. Even with empty block list,
|
||||||
|
// this means that the BlockRef entries linked to this version cannot be
|
||||||
|
// marked as deleted (they are marked as deleted only if the Version
|
||||||
|
// doesn't exist or is marked as deleted).
|
||||||
let mut dest_version = Version::new(
|
let mut dest_version = Version::new(
|
||||||
new_uuid,
|
new_uuid,
|
||||||
dest_bucket.to_string(),
|
dest_bucket.to_string(),
|
||||||
dest_key.to_string(),
|
dest_key.to_string(),
|
||||||
false,
|
false,
|
||||||
);
|
);
|
||||||
|
garage.version_table.insert(&dest_version).await?;
|
||||||
|
|
||||||
|
// Fill in block list for version and insert block refs
|
||||||
for (bk, bv) in source_version.blocks.items().iter() {
|
for (bk, bv) in source_version.blocks.items().iter() {
|
||||||
dest_version.blocks.put(*bk, *bv);
|
dest_version.blocks.put(*bk, *bv);
|
||||||
}
|
}
|
||||||
let dest_object = Object::new(
|
|
||||||
dest_bucket.to_string(),
|
|
||||||
dest_key.to_string(),
|
|
||||||
vec![dest_object_version],
|
|
||||||
);
|
|
||||||
let dest_block_refs = dest_version
|
let dest_block_refs = dest_version
|
||||||
.blocks
|
.blocks
|
||||||
.items()
|
.items()
|
||||||
|
@ -91,14 +110,21 @@ pub async fn handle_copy(
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
futures::try_join!(
|
futures::try_join!(
|
||||||
garage.object_table.insert(&dest_object),
|
|
||||||
garage.version_table.insert(&dest_version),
|
garage.version_table.insert(&dest_version),
|
||||||
garage.block_ref_table.insert_many(&dest_block_refs[..]),
|
garage.block_ref_table.insert_many(&dest_block_refs[..]),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// Insert final object
|
||||||
|
// We do this last because otherwise there is a race condition in the case where
|
||||||
|
// the copy call has the same source and destination (this happens, rclone does
|
||||||
|
// it to update the modification timestamp for instance). If we did this concurrently
|
||||||
|
// with the stuff before, the block's reference counts could be decremented before
|
||||||
|
// they are incremented again for the new version, leading to data being deleted.
|
||||||
|
garage.object_table.insert(&dest_object).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let now = Utc::now();
|
let now = Utc::now(); // FIXME use the unix timestamp from above
|
||||||
let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true);
|
let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true);
|
||||||
let mut xml = String::new();
|
let mut xml = String::new();
|
||||||
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
|
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
|
||||||
|
|
|
@ -29,16 +29,16 @@ async fn handle_delete_internal(
|
||||||
_ => true,
|
_ => true,
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut must_delete = None;
|
let mut version_to_delete = None;
|
||||||
let mut timestamp = now_msec();
|
let mut timestamp = now_msec();
|
||||||
for v in interesting_versions {
|
for v in interesting_versions {
|
||||||
if v.timestamp + 1 > timestamp || must_delete.is_none() {
|
if v.timestamp + 1 > timestamp || version_to_delete.is_none() {
|
||||||
must_delete = Some(v.uuid);
|
version_to_delete = Some(v.uuid);
|
||||||
}
|
}
|
||||||
timestamp = std::cmp::max(timestamp, v.timestamp + 1);
|
timestamp = std::cmp::max(timestamp, v.timestamp + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
let deleted_version = must_delete.ok_or(Error::NotFound)?;
|
let deleted_version = version_to_delete.ok_or(Error::NotFound)?;
|
||||||
|
|
||||||
let version_uuid = gen_uuid();
|
let version_uuid = gen_uuid();
|
||||||
|
|
||||||
|
|
|
@ -87,17 +87,21 @@ pub async fn handle_put(
|
||||||
// that we are uploading something
|
// that we are uploading something
|
||||||
let mut object_version = ObjectVersion {
|
let mut object_version = ObjectVersion {
|
||||||
uuid: version_uuid,
|
uuid: version_uuid,
|
||||||
timestamp: now_msec(),
|
timestamp: version_timestamp,
|
||||||
state: ObjectVersionState::Uploading(headers.clone()),
|
state: ObjectVersionState::Uploading(headers.clone()),
|
||||||
};
|
};
|
||||||
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
|
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
|
||||||
garage.object_table.insert(&object).await?;
|
garage.object_table.insert(&object).await?;
|
||||||
|
|
||||||
// Initialize corresponding entry in version table
|
// Initialize corresponding entry in version table
|
||||||
|
// Write this entry now, even with empty block list,
|
||||||
|
// to prevent block_ref entries from being deleted (they can be deleted
|
||||||
|
// if the reference a version that isn't found in the version table)
|
||||||
let version = Version::new(version_uuid, bucket.into(), key.into(), false);
|
let version = Version::new(version_uuid, bucket.into(), key.into(), false);
|
||||||
let first_block_hash = blake2sum(&first_block[..]);
|
garage.version_table.insert(&version).await?;
|
||||||
|
|
||||||
// Transfer data and verify checksum
|
// Transfer data and verify checksum
|
||||||
|
let first_block_hash = blake2sum(&first_block[..]);
|
||||||
let tx_result = read_and_put_blocks(
|
let tx_result = read_and_put_blocks(
|
||||||
&garage,
|
&garage,
|
||||||
&version,
|
&version,
|
||||||
|
@ -173,7 +177,7 @@ fn ensure_checksum_matches(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_and_put_blocks(
|
async fn read_and_put_blocks(
|
||||||
garage: &Arc<Garage>,
|
garage: &Garage,
|
||||||
version: &Version,
|
version: &Version,
|
||||||
part_number: u64,
|
part_number: u64,
|
||||||
first_block: Vec<u8>,
|
first_block: Vec<u8>,
|
||||||
|
@ -187,7 +191,7 @@ async fn read_and_put_blocks(
|
||||||
|
|
||||||
let mut next_offset = first_block.len();
|
let mut next_offset = first_block.len();
|
||||||
let mut put_curr_version_block = put_block_meta(
|
let mut put_curr_version_block = put_block_meta(
|
||||||
garage.clone(),
|
&garage,
|
||||||
&version,
|
&version,
|
||||||
part_number,
|
part_number,
|
||||||
0,
|
0,
|
||||||
|
@ -207,7 +211,7 @@ async fn read_and_put_blocks(
|
||||||
let block_hash = blake2sum(&block[..]);
|
let block_hash = blake2sum(&block[..]);
|
||||||
let block_len = block.len();
|
let block_len = block.len();
|
||||||
put_curr_version_block = put_block_meta(
|
put_curr_version_block = put_block_meta(
|
||||||
garage.clone(),
|
&garage,
|
||||||
&version,
|
&version,
|
||||||
part_number,
|
part_number,
|
||||||
next_offset as u64,
|
next_offset as u64,
|
||||||
|
@ -231,14 +235,13 @@ async fn read_and_put_blocks(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn put_block_meta(
|
async fn put_block_meta(
|
||||||
garage: Arc<Garage>,
|
garage: &Garage,
|
||||||
version: &Version,
|
version: &Version,
|
||||||
part_number: u64,
|
part_number: u64,
|
||||||
offset: u64,
|
offset: u64,
|
||||||
hash: Hash,
|
hash: Hash,
|
||||||
size: u64,
|
size: u64,
|
||||||
) -> Result<(), GarageError> {
|
) -> Result<(), GarageError> {
|
||||||
// TODO: don't clone, restart from empty block list ??
|
|
||||||
let mut version = version.clone();
|
let mut version = version.clone();
|
||||||
version.blocks.put(
|
version.blocks.put(
|
||||||
VersionBlockKey {
|
VersionBlockKey {
|
||||||
|
@ -316,6 +319,7 @@ pub async fn handle_create_multipart_upload(
|
||||||
let version_uuid = gen_uuid();
|
let version_uuid = gen_uuid();
|
||||||
let headers = get_headers(req)?;
|
let headers = get_headers(req)?;
|
||||||
|
|
||||||
|
// Create object in object table
|
||||||
let object_version = ObjectVersion {
|
let object_version = ObjectVersion {
|
||||||
uuid: version_uuid,
|
uuid: version_uuid,
|
||||||
timestamp: now_msec(),
|
timestamp: now_msec(),
|
||||||
|
@ -324,6 +328,14 @@ pub async fn handle_create_multipart_upload(
|
||||||
let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
|
let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
|
||||||
garage.object_table.insert(&object).await?;
|
garage.object_table.insert(&object).await?;
|
||||||
|
|
||||||
|
// Insert empty version so that block_ref entries refer to something
|
||||||
|
// (they are inserted concurrently with blocks in the version table, so
|
||||||
|
// there is the possibility that they are inserted before the version table
|
||||||
|
// is created, in which case it is allowed to delete them, e.g. in repair_*)
|
||||||
|
let version = Version::new(version_uuid, bucket.into(), key.into(), false);
|
||||||
|
garage.version_table.insert(&version).await?;
|
||||||
|
|
||||||
|
// Send success response
|
||||||
let mut xml = String::new();
|
let mut xml = String::new();
|
||||||
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
|
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
|
||||||
writeln!(
|
writeln!(
|
||||||
|
@ -450,14 +462,12 @@ pub async fn handle_complete_multipart_upload(
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
|
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
|
||||||
let object_version = object
|
let mut object_version = object
|
||||||
.versions()
|
.versions()
|
||||||
.iter()
|
.iter()
|
||||||
.find(|v| v.uuid == version_uuid && v.is_uploading());
|
.find(|v| v.uuid == version_uuid && v.is_uploading())
|
||||||
let mut object_version = match object_version {
|
.cloned()
|
||||||
None => return Err(Error::NotFound),
|
.ok_or(Error::BadRequest(format!("Version not found")))?;
|
||||||
Some(x) => x.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
|
let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
|
||||||
if version.blocks.len() == 0 {
|
if version.blocks.len() == 0 {
|
||||||
|
@ -498,12 +508,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts);
|
let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts);
|
||||||
|
|
||||||
// Calculate total size of final object
|
// Calculate total size of final object
|
||||||
let total_size = version
|
let total_size = version.blocks.items().iter().map(|x| x.1.size).sum();
|
||||||
.blocks
|
|
||||||
.items()
|
|
||||||
.iter()
|
|
||||||
.map(|x| x.1.size)
|
|
||||||
.fold(0, |x, y| x + y);
|
|
||||||
|
|
||||||
// Write final object version
|
// Write final object version
|
||||||
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
|
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
|
||||||
|
|
|
@ -82,13 +82,7 @@ impl Repair {
|
||||||
.versions()
|
.versions()
|
||||||
.iter()
|
.iter()
|
||||||
.any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted),
|
.any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted),
|
||||||
None => {
|
None => false,
|
||||||
warn!(
|
|
||||||
"Repair versions: object for version {:?} not found, skipping.",
|
|
||||||
version
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
if !version_exists {
|
if !version_exists {
|
||||||
info!("Repair versions: marking version as deleted: {:?}", version);
|
info!("Repair versions: marking version as deleted: {:?}", version);
|
||||||
|
@ -127,16 +121,8 @@ impl Repair {
|
||||||
.version_table
|
.version_table
|
||||||
.get(&block_ref.version, &EmptyKey)
|
.get(&block_ref.version, &EmptyKey)
|
||||||
.await?;
|
.await?;
|
||||||
let ref_exists = match version {
|
// The version might not exist if it has been GC'ed
|
||||||
Some(v) => !v.deleted.get(),
|
let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false);
|
||||||
None => {
|
|
||||||
warn!(
|
|
||||||
"Block ref repair: version for block ref {:?} not found, skipping.",
|
|
||||||
block_ref
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if !ref_exists {
|
if !ref_exists {
|
||||||
info!(
|
info!(
|
||||||
"Repair block ref: marking block_ref as deleted: {:?}",
|
"Repair block ref: marking block_ref as deleted: {:?}",
|
||||||
|
|
|
@ -28,6 +28,7 @@ use crate::garage::Garage;
|
||||||
pub const INLINE_THRESHOLD: usize = 3072;
|
pub const INLINE_THRESHOLD: usize = 3072;
|
||||||
|
|
||||||
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
|
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
|
||||||
|
const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
|
||||||
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
|
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
|
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
|
@ -175,7 +176,10 @@ impl BlockManager {
|
||||||
|
|
||||||
if data::blake2sum(&data[..]) != *hash {
|
if data::blake2sum(&data[..]) != *hash {
|
||||||
let _lock = self.data_dir_lock.lock().await;
|
let _lock = self.data_dir_lock.lock().await;
|
||||||
warn!("Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash);
|
warn!(
|
||||||
|
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
|
||||||
|
hash
|
||||||
|
);
|
||||||
let mut path2 = path.clone();
|
let mut path2 = path.clone();
|
||||||
path2.set_extension(".corrupted");
|
path2.set_extension(".corrupted");
|
||||||
fs::rename(path, path2).await?;
|
fs::rename(path, path2).await?;
|
||||||
|
@ -225,7 +229,7 @@ impl BlockManager {
|
||||||
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
|
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
|
||||||
let new_rc = self.rc.merge(&hash, vec![0])?;
|
let new_rc = self.rc.merge(&hash, vec![0])?;
|
||||||
if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
|
if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
|
||||||
self.put_to_resync(&hash, Duration::from_secs(0))?;
|
self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue