forked from Deuxfleurs/garage
PutObject: better cleanup on Drop (incl. when request is interrupted in the middle)
This commit is contained in:
parent
4eb8ca3a52
commit
0650a43cf1
1 changed files with 59 additions and 34 deletions
|
@ -119,6 +119,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
return Ok((version_uuid, data_md5sum_hex));
|
return Ok((version_uuid, data_md5sum_hex));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The following consists in many steps that can each fail.
|
||||||
|
// Keep track that some cleanup will be needed if things fail
|
||||||
|
// before everything is finished (cleanup is done using the Drop trait).
|
||||||
|
let mut interrupted_cleanup = InterruptedCleanup(Some((
|
||||||
|
garage.clone(),
|
||||||
|
bucket.id,
|
||||||
|
key.into(),
|
||||||
|
version_uuid,
|
||||||
|
version_timestamp,
|
||||||
|
)));
|
||||||
|
|
||||||
// Write version identifier in object table so that we have a trace
|
// Write version identifier in object table so that we have a trace
|
||||||
// that we are uploading something
|
// that we are uploading something
|
||||||
let mut object_version = ObjectVersion {
|
let mut object_version = ObjectVersion {
|
||||||
|
@ -139,44 +150,27 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
// Transfer data and verify checksum
|
// Transfer data and verify checksum
|
||||||
let first_block_hash = async_blake2sum(first_block.clone()).await;
|
let first_block_hash = async_blake2sum(first_block.clone()).await;
|
||||||
|
|
||||||
let tx_result = (|| async {
|
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
|
||||||
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
|
&garage,
|
||||||
&garage,
|
&version,
|
||||||
&version,
|
1,
|
||||||
1,
|
first_block,
|
||||||
first_block,
|
first_block_hash,
|
||||||
first_block_hash,
|
&mut chunker,
|
||||||
&mut chunker,
|
)
|
||||||
)
|
.await?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
ensure_checksum_matches(
|
ensure_checksum_matches(
|
||||||
data_md5sum.as_slice(),
|
data_md5sum.as_slice(),
|
||||||
data_sha256sum,
|
data_sha256sum,
|
||||||
content_md5.as_deref(),
|
content_md5.as_deref(),
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
check_quotas(&garage, bucket, key, total_size).await?;
|
check_quotas(&garage, bucket, key, total_size).await?;
|
||||||
|
|
||||||
Ok((total_size, data_md5sum))
|
|
||||||
})()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// If something went wrong, clean up
|
|
||||||
let (total_size, md5sum_arr) = match tx_result {
|
|
||||||
Ok(rv) => rv,
|
|
||||||
Err(e) => {
|
|
||||||
// Mark object as aborted, this will free the blocks further down
|
|
||||||
object_version.state = ObjectVersionState::Aborted;
|
|
||||||
let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
|
|
||||||
garage.object_table.insert(&object).await?;
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Save final object state, marked as Complete
|
// Save final object state, marked as Complete
|
||||||
let md5sum_hex = hex::encode(md5sum_arr);
|
let md5sum_hex = hex::encode(data_md5sum);
|
||||||
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
|
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
|
||||||
ObjectVersionMeta {
|
ObjectVersionMeta {
|
||||||
headers,
|
headers,
|
||||||
|
@ -188,6 +182,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
let object = Object::new(bucket.id, key.into(), vec![object_version]);
|
let object = Object::new(bucket.id, key.into(), vec![object_version]);
|
||||||
garage.object_table.insert(&object).await?;
|
garage.object_table.insert(&object).await?;
|
||||||
|
|
||||||
|
// We were not interrupted, everything went fine.
|
||||||
|
// We won't have to clean up on drop.
|
||||||
|
interrupted_cleanup.cancel();
|
||||||
|
|
||||||
Ok((version_uuid, md5sum_hex))
|
Ok((version_uuid, md5sum_hex))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -426,6 +424,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>);
|
||||||
|
|
||||||
|
impl InterruptedCleanup {
|
||||||
|
fn cancel(&mut self) {
|
||||||
|
drop(self.0.take());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl Drop for InterruptedCleanup {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let object_version = ObjectVersion {
|
||||||
|
uuid: version_uuid,
|
||||||
|
timestamp: version_ts,
|
||||||
|
state: ObjectVersionState::Aborted,
|
||||||
|
};
|
||||||
|
let object = Object::new(bucket_id, key, vec![object_version]);
|
||||||
|
if let Err(e) = garage.object_table.insert(&object).await {
|
||||||
|
warn!("Cannot cleanup after aborted PutObject: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
pub async fn handle_create_multipart_upload(
|
pub async fn handle_create_multipart_upload(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: &Request<Body>,
|
req: &Request<Body>,
|
||||||
|
|
Loading…
Reference in a new issue