Fix #204 (full Multipart Uploads semantics) #553

Merged
lx merged 20 commits from nlnet-task1 into next 2023-06-09 15:34:10 +00:00
Showing only changes of commit bb176ebcb8 - Show all commits

View file

@ -119,75 +119,74 @@ impl AdminRpcHandler {
.get(&br.version, &EmptyKey) .get(&br.version, &EmptyKey)
.await? .await?
{ {
self.handle_block_purge_version_backlink(&version, &mut obj_dels, &mut mpu_dels).await?; self.handle_block_purge_version_backlink(
&version,
&mut obj_dels,
&mut mpu_dels,
)
.await?;
if !version.deleted.get() { if !version.deleted.get() {
let deleted_version = let deleted_version = Version::new(version.uuid, version.backlink, true);
Version::new(version.uuid, version.backlink, true); self.garage.version_table.insert(&deleted_version).await?;
self.garage.version_table.insert(&deleted_version).await?; ver_dels += 1;
ver_dels += 1; }
} }
} }
} }
}
Ok(AdminRpc::Ok(format!( Ok(AdminRpc::Ok(format!(
"Purged {} blocks, {} versions, {} objects, {} multipart uploads", "Purged {} blocks, {} versions, {} objects, {} multipart uploads",
blocks.len(), blocks.len(),
ver_dels, ver_dels,
obj_dels, obj_dels,
mpu_dels, mpu_dels,
))) )))
}
async fn handle_block_purge_version_backlink(&self, version: &Version, obj_dels: &mut usize, mpu_dels: &mut usize) -> Result<(), Error> {
let (bucket_id, key, ov_id) = match &version.backlink {
VersionBacklink::Object{bucket_id, key} => {
(*bucket_id, key.clone(), version.uuid)
}
VersionBacklink::MultipartUpload{upload_id} => {
if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? {
if !mpu.deleted.get() {
mpu.parts.clear();
mpu.deleted.set();
self.garage.mpu_table.insert(&mpu).await?;
*mpu_dels += 1;
}
(mpu.bucket_id, mpu.key.clone(), *upload_id)
} else {
return Ok(());
}
}
};
if let Some(object) = self
.garage
.object_table
.get(&bucket_id, &key)
.await?
{
let ov = object.versions().iter().rev().find(|v| v.is_complete());
if let Some(ov) = ov {
if ov.uuid == ov_id {
let del_uuid = gen_uuid();
let deleted_object = Object::new(
bucket_id,
key,
vec![ObjectVersion {
uuid: del_uuid,
timestamp: ov.timestamp + 1,
state: ObjectVersionState::Complete(
ObjectVersionData::DeleteMarker,
),
}],
);
self.garage.object_table.insert(&deleted_object).await?;
*obj_dels += 1;
}
}
}
Ok(())
} }
async fn handle_block_purge_version_backlink(
&self,
version: &Version,
obj_dels: &mut usize,
mpu_dels: &mut usize,
) -> Result<(), Error> {
let (bucket_id, key, ov_id) = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
VersionBacklink::MultipartUpload { upload_id } => {
if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? {
if !mpu.deleted.get() {
mpu.parts.clear();
mpu.deleted.set();
self.garage.mpu_table.insert(&mpu).await?;
*mpu_dels += 1;
}
(mpu.bucket_id, mpu.key.clone(), *upload_id)
} else {
return Ok(());
}
}
};
if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? {
let ov = object.versions().iter().rev().find(|v| v.is_complete());
if let Some(ov) = ov {
if ov.uuid == ov_id {
let del_uuid = gen_uuid();
let deleted_object = Object::new(
bucket_id,
key,
vec![ObjectVersion {
uuid: del_uuid,
timestamp: ov.timestamp + 1,
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
self.garage.object_table.insert(&deleted_object).await?;
*obj_dels += 1;
}
}
}
Ok(())
}
} }