When saving block, delete .tmp file if we could not complete
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing
continuous-integration/drone Build is passing

This commit is contained in:
Alex 2023-01-03 17:29:11 +01:00
parent 0650a43cf1
commit 936b6cb563
Signed by: lx
GPG key ID: 0E496D15096376BE

View file

@ -6,6 +6,7 @@ use std::time::Duration;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use rand::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use futures::Stream; use futures::Stream;
@ -649,14 +650,21 @@ impl BlockManagerLocked {
} }
}; };
let mut path2 = path.clone(); let mut path_tmp = path.clone();
path2.set_extension("tmp"); let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
let mut f = fs::File::create(&path2).await?; path_tmp.set_extension(tmp_extension);
let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
let mut f = fs::File::create(&path_tmp).await?;
f.write_all(data).await?; f.write_all(data).await?;
f.sync_all().await?; f.sync_all().await?;
drop(f); drop(f);
fs::rename(path2, path).await?; fs::rename(path_tmp, path).await?;
delete_on_drop.cancel();
if let Some(to_delete) = to_delete { if let Some(to_delete) = to_delete {
fs::remove_file(to_delete).await?; fs::remove_file(to_delete).await?;
} }
@ -722,3 +730,23 @@ async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
.concat() .concat()
.into()) .into())
} }
struct DeleteOnDrop(Option<PathBuf>);
impl DeleteOnDrop {
fn cancel(&mut self) {
drop(self.0.take());
}
}
impl Drop for DeleteOnDrop {
fn drop(&mut self) {
if let Some(path) = self.0.take() {
tokio::spawn(async move {
if let Err(e) = fs::remove_file(&path).await {
debug!("DeleteOnDrop failed for {}: {}", path.display(), e);
}
});
}
}
}