From 64a6eda0d2bb6aae6a6eef60032ab24ae3c37a88 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Jul 2020 17:33:24 +0200 Subject: [PATCH] Migrate S3 api to use new model --- src/api/s3_copy.rs | 20 +++++------- src/api/s3_delete.rs | 11 ++++--- src/api/s3_get.rs | 59 +++++++++++++++++++++--------------- src/api/s3_list.rs | 8 ++++- src/api/s3_put.rs | 72 +++++++++++++++++++++++++++----------------- 5 files changed, 100 insertions(+), 70 deletions(-) diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index 0af1b13a3..b54701e2f 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -39,27 +39,23 @@ pub async fn handle_copy( Some(v) => v, None => return Err(Error::NotFound), }; + let source_last_state = match &source_last_v.state { + ObjectVersionState::Complete(x) => x, + _ => unreachable!(), + }; let new_uuid = gen_uuid(); let dest_object_version = ObjectVersion { uuid: new_uuid, timestamp: now_msec(), - mime_type: source_last_v.mime_type.clone(), - size: source_last_v.size, - state: ObjectVersionState::Complete, - data: source_last_v.data.clone(), + state: ObjectVersionState::Complete(source_last_state.clone()), }; - match &source_last_v.data { - ObjectVersionData::Uploading => { - return Err(Error::Message(format!( - "Version is_complete() but data is stil Uploading (internal error)" - ))); - } + match &source_last_state { ObjectVersionData::DeleteMarker => { return Err(Error::NotFound); } - ObjectVersionData::Inline(_bytes) => { + ObjectVersionData::Inline(_meta, _bytes) => { let dest_object = Object::new( dest_bucket.to_string(), dest_key.to_string(), @@ -67,7 +63,7 @@ pub async fn handle_copy( ); garage.object_table.insert(&dest_object).await?; } - ObjectVersionData::FirstBlock(_first_block_hash) => { + ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { let source_version = garage .version_table .get(&source_last_v.uuid, &EmptyKey) diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 949ad6a15..c5cd5970c 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -29,7 +29,11 @@ async fn handle_delete_internal( }; let interesting_versions = object.versions().iter().filter(|v| { - v.data != ObjectVersionData::DeleteMarker && v.state != ObjectVersionState::Aborted + match v.state { + ObjectVersionState::Aborted => false, + ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, + _ => true, + } }); let mut must_delete = None; @@ -54,10 +58,7 @@ async fn handle_delete_internal( vec![ObjectVersion { uuid: version_uuid, timestamp: now_msec(), - mime_type: "application/x-delete-marker".into(), - size: 0, - state: ObjectVersionState::Complete, - data: ObjectVersionData::DeleteMarker, + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), }], ); diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 42cf55d1a..25b3d3e31 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -12,13 +12,14 @@ use garage_table::EmptyKey; use garage_model::garage::Garage; use garage_model::object_table::*; -fn object_headers(version: &ObjectVersion) -> http::response::Builder { +fn object_headers(version: &ObjectVersion, version_meta: &ObjectVersionMeta) -> http::response::Builder { let date = UNIX_EPOCH + Duration::from_millis(version.timestamp); let date_str = httpdate::fmt_http_date(date); Response::builder() - .header("Content-Type", version.mime_type.to_string()) - .header("Content-Length", format!("{}", version.size)) + .header("Content-Type", version_meta.headers.content_type.to_string()) + .header("Content-Length", format!("{}", version_meta.size)) + .header("ETag", version_meta.etag.to_string()) .header("Last-Modified", date_str) .header("Accept-Ranges", format!("bytes")) } @@ -47,9 +48,14 @@ pub async fn handle_head( Some(v) => v, None => return Err(Error::NotFound), }; + let version_meta = match &version.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, + ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta, + _ => unreachable!(), + }; let body: Body = Body::from(vec![]); - let response = object_headers(&version) + let response = object_headers(&version, version_meta) .status(StatusCode::OK) .body(body) .unwrap(); @@ -81,13 +87,22 @@ pub async fn handle_get( Some(v) => v, None => return Err(Error::NotFound), }; + let last_v_data = match &last_v.state { + ObjectVersionState::Complete(x) => x, + _ => unreachable!(), + }; + let last_v_meta = match last_v_data { + ObjectVersionData::DeleteMarker => return Err(Error::NotFound), + ObjectVersionData::Inline(meta, _) => meta, + ObjectVersionData::FirstBlock(meta, _) => meta, + }; let range = match req.headers().get("range") { Some(range) => { let range_str = range .to_str() .map_err(|e| Error::BadRequest(format!("Invalid range header: {}", e)))?; - let mut ranges = http_range::HttpRange::parse(range_str, last_v.size) + let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size) .map_err(|_e| Error::BadRequest(format!("Invalid range")))?; if ranges.len() > 1 { return Err(Error::BadRequest(format!("Multiple ranges not supported"))); @@ -98,21 +113,18 @@ pub async fn handle_get( None => None, }; if let Some(range) = range { - return handle_get_range(garage, last_v, range.start, range.start + range.length).await; + return handle_get_range(garage, last_v, last_v_data, last_v_meta, range.start, range.start + range.length).await; } - let resp_builder = object_headers(&last_v).status(StatusCode::OK); + let resp_builder = object_headers(&last_v, last_v_meta).status(StatusCode::OK); - match &last_v.data { - ObjectVersionData::Uploading => Err(Error::Message(format!( - "Version is_complete() but data is stil Uploading (internal error)" - ))), - ObjectVersionData::DeleteMarker => Err(Error::NotFound), - ObjectVersionData::Inline(bytes) => { + match &last_v_data { + ObjectVersionData::DeleteMarker => unreachable!(), + ObjectVersionData::Inline(_, bytes) => { let body: Body = Body::from(bytes.to_vec()); Ok(resp_builder.body(body)?) } - ObjectVersionData::FirstBlock(first_block_hash) => { + ObjectVersionData::FirstBlock(_, first_block_hash) => { let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); @@ -155,26 +167,25 @@ pub async fn handle_get( pub async fn handle_get_range( garage: Arc, version: &ObjectVersion, + version_data: &ObjectVersionData, + version_meta: &ObjectVersionMeta, begin: u64, end: u64, ) -> Result, Error> { - if end > version.size { + if end > version_meta.size { return Err(Error::BadRequest(format!("Range not included in file"))); } - let resp_builder = object_headers(&version) + let resp_builder = object_headers(version, version_meta) .header( "Content-Range", - format!("bytes {}-{}/{}", begin, end, version.size), + format!("bytes {}-{}/{}", begin, end, version_meta.size), ) .status(StatusCode::PARTIAL_CONTENT); - match &version.data { - ObjectVersionData::Uploading => Err(Error::Message(format!( - "Version is_complete() but data is stil Uploading (internal error)" - ))), - ObjectVersionData::DeleteMarker => Err(Error::NotFound), - ObjectVersionData::Inline(bytes) => { + match &version_data { + ObjectVersionData::DeleteMarker => unreachable!(), + ObjectVersionData::Inline(_meta, bytes) => { if end as usize <= bytes.len() { let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec()); Ok(resp_builder.body(body)?) @@ -182,7 +193,7 @@ pub async fn handle_get_range( Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been"))) } } - ObjectVersionData::FirstBlock(_first_block_hash) => { + ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { let version = garage.version_table.get(&version.uuid, &EmptyKey).await?; let version = match version { Some(v) => v, diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 1f0eccf5f..3fca13480 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -8,6 +8,7 @@ use hyper::{Body, Response}; use garage_util::error::Error; use garage_model::garage::Garage; +use garage_model::object_table::*; use crate::encoding::*; @@ -73,10 +74,15 @@ pub async fn handle_list( if let Some(pfx) = common_prefix { result_common_prefixes.insert(pfx.to_string()); } else { + let size = match &version.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta.size, + ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size, + _ => unreachable!(), + }; let info = match result_keys.get(&object.key) { None => ListResultInfo { last_modified: version.timestamp, - size: version.size, + size, }, Some(_lri) => { return Err(Error::Message(format!("Duplicate key?? {}", object.key))) diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index c5d0a31c8..756221681 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::collections::{VecDeque, BTreeMap}; use std::fmt::Write; use std::sync::Arc; @@ -24,7 +24,11 @@ pub async fn handle_put( key: &str, ) -> Result, Error> { let version_uuid = gen_uuid(); - let mime_type = get_mime_type(&req)?; + let headers = ObjectVersionHeaders{ + content_type: get_mime_type(&req)?, + other: BTreeMap::new(), // TODO + }; + let body = req.into_body(); let mut chunker = BodyChunker::new(body, garage.config.block_size); @@ -36,15 +40,17 @@ pub async fn handle_put( let mut object_version = ObjectVersion { uuid: version_uuid, timestamp: now_msec(), - mime_type, - size: first_block.len() as u64, - state: ObjectVersionState::Uploading, - data: ObjectVersionData::Uploading, + state: ObjectVersionState::Uploading(headers.clone()), }; if first_block.len() < INLINE_THRESHOLD { - object_version.data = ObjectVersionData::Inline(first_block); - object_version.state = ObjectVersionState::Complete; + object_version.state = ObjectVersionState::Complete(ObjectVersionData::Inline( + ObjectVersionMeta{ + headers, + size: first_block.len() as u64, + etag: "".to_string(), // TODO + }, + first_block)); let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; @@ -54,7 +60,6 @@ pub async fn handle_put( let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); let first_block_hash = hash(&first_block[..]); - object_version.data = ObjectVersionData::FirstBlock(first_block_hash); let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; @@ -70,8 +75,13 @@ pub async fn handle_put( // TODO: if at any step we have an error, we should undo everything we did - object_version.state = ObjectVersionState::Complete; - object_version.size = total_size; + object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( + ObjectVersionMeta{ + headers, + size: total_size, + etag: "".to_string(), // TODO + }, + first_block_hash)); let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; @@ -197,6 +207,7 @@ impl BodyChunker { pub fn put_response(version_uuid: UUID) -> Response { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) + // TODO ETag .body(Body::from(vec![])) .unwrap() } @@ -208,15 +219,15 @@ pub async fn handle_create_multipart_upload( key: &str, ) -> Result, Error> { let version_uuid = gen_uuid(); - let mime_type = get_mime_type(req)?; + let headers = ObjectVersionHeaders{ + content_type: get_mime_type(&req)?, + other: BTreeMap::new(), // TODO + }; let object_version = ObjectVersion { uuid: version_uuid, timestamp: now_msec(), - mime_type, - size: 0, - state: ObjectVersionState::Uploading, - data: ObjectVersionData::Uploading, + state: ObjectVersionState::Uploading(headers), }; let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]); garage.object_table.insert(&object).await?; @@ -276,9 +287,7 @@ pub async fn handle_put_part( Some(x) => x, }; if !object.versions().iter().any(|v| { - v.uuid == version_uuid - && v.state == ObjectVersionState::Uploading - && v.data == ObjectVersionData::Uploading + v.uuid == version_uuid && v.is_uploading() }) { return Err(Error::BadRequest(format!( "Multipart upload does not exist or is otherwise invalid" @@ -322,9 +331,7 @@ pub async fn handle_complete_multipart_upload( Some(x) => x, }; let object_version = object.versions().iter().find(|v| { - v.uuid == version_uuid - && v.state == ObjectVersionState::Uploading - && v.data == ObjectVersionData::Uploading + v.uuid == version_uuid && v.is_uploading() }); let mut object_version = match object_version { None => { @@ -341,6 +348,10 @@ pub async fn handle_complete_multipart_upload( if version.blocks().len() == 0 { return Err(Error::BadRequest(format!("No data was uploaded"))); } + let headers = match object_version.state { + ObjectVersionState::Uploading(headers) => headers.clone(), + _ => unreachable!(), + }; // TODO: check that all the parts that they pretend they gave us are indeed there // TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere... @@ -350,9 +361,16 @@ pub async fn handle_complete_multipart_upload( .iter() .map(|x| x.size) .fold(0, |x, y| x + y); - object_version.size = total_size; - object_version.state = ObjectVersionState::Complete; - object_version.data = ObjectVersionData::FirstBlock(version.blocks()[0].hash); + object_version.state = ObjectVersionState::Complete( + ObjectVersionData::FirstBlock( + ObjectVersionMeta{ + headers, + size: total_size, + etag: "".to_string(),// TODO + }, + version.blocks()[0].hash) + ); + let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; @@ -394,9 +412,7 @@ pub async fn handle_abort_multipart_upload( Some(x) => x, }; let object_version = object.versions().iter().find(|v| { - v.uuid == version_uuid - && v.state == ObjectVersionState::Uploading - && v.data == ObjectVersionData::Uploading + v.uuid == version_uuid && v.is_uploading() }); let mut object_version = match object_version { None => {