garage/src/api/s3_copy.rs

169 lines
4.9 KiB
Rust
Raw Normal View History

2020-04-28 10:18:14 +00:00
use std::sync::Arc;
2021-03-15 16:21:41 +01:00
use hyper::{Body, Request, Response};
2020-04-28 10:18:14 +00:00
use garage_table::*;
use garage_util::data::*;
2021-03-15 16:21:41 +01:00
use garage_util::time::*;
2020-04-28 10:18:14 +00:00
2020-07-07 13:59:22 +02:00
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
use garage_model::version_table::*;
2020-04-28 10:18:14 +00:00
2020-11-08 15:04:30 +01:00
use crate::error::*;
2021-03-15 16:21:41 +01:00
use crate::s3_put::get_headers;
use crate::s3_xml;
2020-11-08 15:04:30 +01:00
2020-04-28 10:18:14 +00:00
pub async fn handle_copy(
garage: Arc<Garage>,
2021-03-15 16:21:41 +01:00
req: &Request<Body>,
2021-12-14 13:55:11 +01:00
dest_bucket_id: Uuid,
2020-04-28 10:18:14 +00:00
dest_key: &str,
2021-12-14 13:55:11 +01:00
source_bucket_id: Uuid,
2020-04-28 10:18:14 +00:00
source_key: &str,
) -> Result<Response<Body>, Error> {
2020-11-11 16:12:42 +01:00
let source_object = garage
2020-04-28 10:18:14 +00:00
.object_table
2021-12-14 13:55:11 +01:00
.get(&source_bucket_id, &source_key.to_string())
2020-04-28 10:18:14 +00:00
.await?
2022-01-05 17:07:36 +01:00
.ok_or(Error::NoSuchKey)?;
2020-04-28 10:18:14 +00:00
2020-11-11 16:12:42 +01:00
let source_last_v = source_object
2020-04-28 10:18:14 +00:00
.versions()
.iter()
.rev()
2021-04-23 22:18:00 +02:00
.find(|v| v.is_complete())
2022-01-05 17:07:36 +01:00
.ok_or(Error::NoSuchKey)?;
2020-11-11 16:12:42 +01:00
2020-07-08 17:34:37 +02:00
let source_last_state = match &source_last_v.state {
ObjectVersionState::Complete(x) => x,
_ => unreachable!(),
};
2020-04-28 10:18:14 +00:00
let new_uuid = gen_uuid();
2021-03-15 15:26:29 +01:00
let new_timestamp = now_msec();
2020-04-28 10:18:14 +00:00
2021-03-15 16:21:41 +01:00
// Implement x-amz-metadata-directive: REPLACE
let old_meta = match source_last_state {
2020-04-28 10:18:14 +00:00
ObjectVersionData::DeleteMarker => {
2022-01-05 17:07:36 +01:00
return Err(Error::NoSuchKey);
2020-04-28 10:18:14 +00:00
}
2021-03-15 16:21:41 +01:00
ObjectVersionData::Inline(meta, _bytes) => meta,
ObjectVersionData::FirstBlock(meta, _fbh) => meta,
};
let new_meta = match req.headers().get("x-amz-metadata-directive") {
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
headers: get_headers(req)?,
size: old_meta.size,
etag: old_meta.etag.clone(),
},
_ => old_meta.clone(),
};
let etag = new_meta.etag.to_string();
2021-03-15 16:21:41 +01:00
// Save object copy
match source_last_state {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
let dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: new_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::Inline(
new_meta,
bytes.clone(),
)),
};
let dest_object = Object::new(
2021-12-14 13:55:11 +01:00
dest_bucket_id,
2021-03-15 16:21:41 +01:00
dest_key.to_string(),
vec![dest_object_version],
);
2020-04-28 10:18:14 +00:00
garage.object_table.insert(&dest_object).await?;
}
2021-03-15 16:21:41 +01:00
ObjectVersionData::FirstBlock(_meta, first_block_hash) => {
2021-03-15 15:26:29 +01:00
// Get block list from source version
2020-04-28 10:18:14 +00:00
let source_version = garage
.version_table
.get(&source_last_v.uuid, &EmptyKey)
.await?;
2022-01-05 17:07:36 +01:00
let source_version = source_version.ok_or(Error::NoSuchKey)?;
2020-04-28 10:18:14 +00:00
2021-03-15 15:26:29 +01:00
// Write an "uploading" marker in Object table
// This holds a reference to the object in the Version table
// so that it won't be deleted, e.g. by repair_versions.
let tmp_dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: new_timestamp,
2021-03-15 16:21:41 +01:00
state: ObjectVersionState::Uploading(new_meta.headers.clone()),
2021-03-15 15:26:29 +01:00
};
let tmp_dest_object = Object::new(
2021-12-14 13:55:11 +01:00
dest_bucket_id,
2021-03-15 15:26:29 +01:00
dest_key.to_string(),
vec![tmp_dest_object_version],
);
garage.object_table.insert(&tmp_dest_object).await?;
// Write version in the version table. Even with empty block list,
// this means that the BlockRef entries linked to this version cannot be
// marked as deleted (they are marked as deleted only if the Version
// doesn't exist or is marked as deleted).
2021-12-14 13:55:11 +01:00
let mut dest_version =
Version::new(new_uuid, dest_bucket_id, dest_key.to_string(), false);
2021-03-15 15:26:29 +01:00
garage.version_table.insert(&dest_version).await?;
// Fill in block list for version and insert block refs
for (bk, bv) in source_version.blocks.items().iter() {
dest_version.blocks.put(*bk, *bv);
}
2020-04-28 10:18:14 +00:00
let dest_block_refs = dest_version
.blocks
.items()
2020-04-28 10:18:14 +00:00
.iter()
.map(|b| BlockRef {
block: b.1.hash,
2020-04-28 10:18:14 +00:00
version: new_uuid,
deleted: false.into(),
2020-04-28 10:18:14 +00:00
})
.collect::<Vec<_>>();
futures::try_join!(
garage.version_table.insert(&dest_version),
garage.block_ref_table.insert_many(&dest_block_refs[..]),
)?;
2021-03-15 15:26:29 +01:00
// Insert final object
// We do this last because otherwise there is a race condition in the case where
// the copy call has the same source and destination (this happens, rclone does
// it to update the modification timestamp for instance). If we did this concurrently
// with the stuff before, the block's reference counts could be decremented before
// they are incremented again for the new version, leading to data being deleted.
2021-03-15 16:21:41 +01:00
let dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: new_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
new_meta,
*first_block_hash,
)),
};
let dest_object = Object::new(
2021-12-14 13:55:11 +01:00
dest_bucket_id,
2021-03-15 16:21:41 +01:00
dest_key.to_string(),
vec![dest_object_version],
);
2021-03-15 15:26:29 +01:00
garage.object_table.insert(&dest_object).await?;
2020-04-28 10:18:14 +00:00
}
}
2021-03-15 16:21:41 +01:00
let last_modified = msec_to_rfc3339(new_timestamp);
let result = s3_xml::CopyObjectResult {
last_modified: s3_xml::Value(last_modified),
etag: s3_xml::Value(etag),
};
let xml = s3_xml::to_xml_with_header(&result)?;
2020-04-28 10:18:14 +00:00
Ok(Response::builder()
2021-02-23 18:46:25 +01:00
.header("Content-Type", "application/xml")
.body(Body::from(xml))?)
2020-04-28 10:18:14 +00:00
}