PutObject: better cleanup when request is interrupted in the middle #462

Merged
lx merged 2 commits from interrupted-cleanup into main 2023-01-04 14:43:45 +00:00
2 changed files with 91 additions and 38 deletions

View file

@ -119,6 +119,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
return Ok((version_uuid, data_md5sum_hex)); return Ok((version_uuid, data_md5sum_hex));
} }
// The following consists in many steps that can each fail.
// Keep track that some cleanup will be needed if things fail
// before everything is finished (cleanup is done using the Drop trait).
let mut interrupted_cleanup = InterruptedCleanup(Some((
garage.clone(),
bucket.id,
key.into(),
version_uuid,
version_timestamp,
)));
// Write version identifier in object table so that we have a trace // Write version identifier in object table so that we have a trace
// that we are uploading something // that we are uploading something
let mut object_version = ObjectVersion { let mut object_version = ObjectVersion {
@ -139,7 +150,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// Transfer data and verify checksum // Transfer data and verify checksum
let first_block_hash = async_blake2sum(first_block.clone()).await; let first_block_hash = async_blake2sum(first_block.clone()).await;
let tx_result = (|| async {
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
&garage, &garage,
&version, &version,
@ -159,24 +169,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
check_quotas(&garage, bucket, key, total_size).await?; check_quotas(&garage, bucket, key, total_size).await?;
Ok((total_size, data_md5sum))
})()
.await;
// If something went wrong, clean up
let (total_size, md5sum_arr) = match tx_result {
Ok(rv) => rv,
Err(e) => {
// Mark object as aborted, this will free the blocks further down
object_version.state = ObjectVersionState::Aborted;
let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
return Err(e);
}
};
// Save final object state, marked as Complete // Save final object state, marked as Complete
let md5sum_hex = hex::encode(md5sum_arr); let md5sum_hex = hex::encode(data_md5sum);
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta { ObjectVersionMeta {
headers, headers,
@ -188,6 +182,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let object = Object::new(bucket.id, key.into(), vec![object_version]); let object = Object::new(bucket.id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
// We were not interrupted, everything went fine.
// We won't have to clean up on drop.
interrupted_cleanup.cancel();
Ok((version_uuid, md5sum_hex)) Ok((version_uuid, md5sum_hex))
} }
@ -426,6 +424,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
.unwrap() .unwrap()
} }
struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>);
impl InterruptedCleanup {
fn cancel(&mut self) {
drop(self.0.take());
}
}
impl Drop for InterruptedCleanup {
fn drop(&mut self) {
if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() {
tokio::spawn(async move {
let object_version = ObjectVersion {
uuid: version_uuid,
timestamp: version_ts,
state: ObjectVersionState::Aborted,
};
let object = Object::new(bucket_id, key, vec![object_version]);
if let Err(e) = garage.object_table.insert(&object).await {
warn!("Cannot cleanup after aborted PutObject: {}", e);
}
});
}
}
}
// ----
pub async fn handle_create_multipart_upload( pub async fn handle_create_multipart_upload(
garage: Arc<Garage>, garage: Arc<Garage>,
req: &Request<Body>, req: &Request<Body>,

View file

@ -6,6 +6,7 @@ use std::time::Duration;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use rand::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use futures::Stream; use futures::Stream;
@ -649,14 +650,21 @@ impl BlockManagerLocked {
} }
}; };
let mut path2 = path.clone(); let mut path_tmp = path.clone();
path2.set_extension("tmp"); let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
let mut f = fs::File::create(&path2).await?; path_tmp.set_extension(tmp_extension);
let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
let mut f = fs::File::create(&path_tmp).await?;
f.write_all(data).await?; f.write_all(data).await?;
f.sync_all().await?; f.sync_all().await?;
drop(f); drop(f);
fs::rename(path2, path).await?; fs::rename(path_tmp, path).await?;
delete_on_drop.cancel();
if let Some(to_delete) = to_delete { if let Some(to_delete) = to_delete {
fs::remove_file(to_delete).await?; fs::remove_file(to_delete).await?;
} }
@ -722,3 +730,23 @@ async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
.concat() .concat()
.into()) .into())
} }
struct DeleteOnDrop(Option<PathBuf>);
impl DeleteOnDrop {
fn cancel(&mut self) {
drop(self.0.take());
}
}
impl Drop for DeleteOnDrop {
fn drop(&mut self) {
if let Some(path) = self.0.take() {
tokio::spawn(async move {
if let Err(e) = fs::remove_file(&path).await {
debug!("DeleteOnDrop failed for {}: {}", path.display(), e);
}
});
}
}
}