|
|
@ -5,9 +5,9 @@ use std::sync::Arc; |
|
|
|
use futures::stream::*; |
|
|
|
use hyper::{Body, Request, Response}; |
|
|
|
|
|
|
|
use garage_table::*; |
|
|
|
use garage_util::data::*; |
|
|
|
use garage_util::error::Error; |
|
|
|
use garage_table::*; |
|
|
|
|
|
|
|
use garage_core::block::INLINE_THRESHOLD; |
|
|
|
use garage_core::block_ref_table::*; |
|
|
@ -15,6 +15,7 @@ use garage_core::garage::Garage; |
|
|
|
use garage_core::object_table::*; |
|
|
|
use garage_core::version_table::*; |
|
|
|
|
|
|
|
use crate::encoding::*; |
|
|
|
use crate::http_util::*; |
|
|
|
|
|
|
|
pub async fn handle_put( |
|
|
@ -30,7 +31,7 @@ pub async fn handle_put( |
|
|
|
let mut chunker = BodyChunker::new(body, garage.config.block_size); |
|
|
|
let first_block = match chunker.next().await? { |
|
|
|
Some(x) => x, |
|
|
|
None => return Err(Error::BadRequest(format!("Empty body"))), |
|
|
|
None => vec![], |
|
|
|
}; |
|
|
|
|
|
|
|
let mut object_version = ObjectVersion { |
|
|
@ -58,7 +59,15 @@ pub async fn handle_put( |
|
|
|
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); |
|
|
|
garage.object_table.insert(&object).await?; |
|
|
|
|
|
|
|
let total_size = read_and_put_blocks(&garage, version, 1, first_block, first_block_hash, &mut chunker).await?; |
|
|
|
let total_size = read_and_put_blocks( |
|
|
|
&garage, |
|
|
|
version, |
|
|
|
1, |
|
|
|
first_block, |
|
|
|
first_block_hash, |
|
|
|
&mut chunker, |
|
|
|
) |
|
|
|
.await?; |
|
|
|
|
|
|
|
// TODO: if at any step we have an error, we should undo everything we did
|
|
|
|
|
|
|
@ -80,8 +89,14 @@ async fn read_and_put_blocks( |
|
|
|
chunker: &mut BodyChunker, |
|
|
|
) -> Result<u64, Error> { |
|
|
|
let mut next_offset = first_block.len(); |
|
|
|
let mut put_curr_version_block = |
|
|
|
put_block_meta(garage.clone(), &version, part_number, 0, first_block_hash, first_block.len() as u64); |
|
|
|
let mut put_curr_version_block = put_block_meta( |
|
|
|
garage.clone(), |
|
|
|
&version, |
|
|
|
part_number, |
|
|
|
0, |
|
|
|
first_block_hash, |
|
|
|
first_block.len() as u64, |
|
|
|
); |
|
|
|
let mut put_curr_block = garage |
|
|
|
.block_manager |
|
|
|
.rpc_put_block(first_block_hash, first_block); |
|
|
@ -92,8 +107,14 @@ async fn read_and_put_blocks( |
|
|
|
if let Some(block) = next_block { |
|
|
|
let block_hash = hash(&block[..]); |
|
|
|
let block_len = block.len(); |
|
|
|
put_curr_version_block = |
|
|
|
put_block_meta(garage.clone(), &version, part_number, next_offset as u64, block_hash, block_len as u64); |
|
|
|
put_curr_version_block = put_block_meta( |
|
|
|
garage.clone(), |
|
|
|
&version, |
|
|
|
part_number, |
|
|
|
next_offset as u64, |
|
|
|
block_hash, |
|
|
|
block_len as u64, |
|
|
|
); |
|
|
|
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); |
|
|
|
next_offset += block_len; |
|
|
|
} else { |
|
|
@ -232,8 +253,9 @@ pub async fn handle_put_part( |
|
|
|
.parse::<u64>() |
|
|
|
.map_err(|e| Error::BadRequest(format!("Invalid part number: {}", e)))?; |
|
|
|
|
|
|
|
let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; |
|
|
|
|
|
|
|
let version_uuid = |
|
|
|
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; |
|
|
|
|
|
|
|
// Read first chuck, and at the same time try to get object to see if it exists
|
|
|
|
let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); |
|
|
|
|
|
|
@ -265,7 +287,15 @@ pub async fn handle_put_part( |
|
|
|
// Copy block to store
|
|
|
|
let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); |
|
|
|
let first_block_hash = hash(&first_block[..]); |
|
|
|
read_and_put_blocks(&garage, version, part_number, first_block, first_block_hash, &mut chunker).await?; |
|
|
|
read_and_put_blocks( |
|
|
|
&garage, |
|
|
|
version, |
|
|
|
part_number, |
|
|
|
first_block, |
|
|
|
first_block_hash, |
|
|
|
&mut chunker, |
|
|
|
) |
|
|
|
.await?; |
|
|
|
|
|
|
|
Ok(Response::new(Box::new(BytesBody::from(vec![])))) |
|
|
|
} |
|
|
@ -277,7 +307,8 @@ pub async fn handle_complete_multipart_upload( |
|
|
|
key: &str, |
|
|
|
upload_id: &str, |
|
|
|
) -> Result<Response<BodyType>, Error> { |
|
|
|
let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; |
|
|
|
let version_uuid = |
|
|
|
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; |
|
|
|
|
|
|
|
let bucket = bucket.to_string(); |
|
|
|
let key = key.to_string(); |
|
|
@ -295,9 +326,11 @@ pub async fn handle_complete_multipart_upload( |
|
|
|
&& v.data == ObjectVersionData::Uploading |
|
|
|
}); |
|
|
|
let mut object_version = match object_version { |
|
|
|
None => return Err(Error::BadRequest(format!( |
|
|
|
"Multipart upload does not exist or has already been completed" |
|
|
|
))), |
|
|
|
None => { |
|
|
|
return Err(Error::BadRequest(format!( |
|
|
|
"Multipart upload does not exist or has already been completed" |
|
|
|
))) |
|
|
|
} |
|
|
|
Some(x) => x.clone(), |
|
|
|
}; |
|
|
|
let version = match version { |
|
|
@ -311,7 +344,11 @@ pub async fn handle_complete_multipart_upload( |
|
|
|
// TODO: check that all the parts that they pretend they gave us are indeed there
|
|
|
|
// TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere...
|
|
|
|
|
|
|
|
let total_size = version.blocks().iter().map(|x| x.size).fold(0, |x, y| x+y); |
|
|
|
let total_size = version |
|
|
|
.blocks() |
|
|
|
.iter() |
|
|
|
.map(|x| x.size) |
|
|
|
.fold(0, |x, y| x + y); |
|
|
|
object_version.size = total_size; |
|
|
|
object_version.state = ObjectVersionState::Complete; |
|
|
|
object_version.data = ObjectVersionData::FirstBlock(version.blocks()[0].hash); |
|
|
@ -325,7 +362,12 @@ pub async fn handle_complete_multipart_upload( |
|
|
|
r#"<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"# |
|
|
|
) |
|
|
|
.unwrap(); |
|
|
|
writeln!(&mut xml, "\t<Location>{}</Location>", garage.config.s3_api.s3_region).unwrap(); |
|
|
|
writeln!( |
|
|
|
&mut xml, |
|
|
|
"\t<Location>{}</Location>", |
|
|
|
garage.config.s3_api.s3_region |
|
|
|
) |
|
|
|
.unwrap(); |
|
|
|
writeln!(&mut xml, "\t<Bucket>{}</Bucket>", bucket).unwrap(); |
|
|
|
writeln!(&mut xml, "\t<Key>{}</Key>", xml_escape(&key)).unwrap(); |
|
|
|
writeln!(&mut xml, "</CompleteMultipartUploadResult>").unwrap(); |
|
|
@ -339,9 +381,13 @@ pub async fn handle_abort_multipart_upload( |
|
|
|
key: &str, |
|
|
|
upload_id: &str, |
|
|
|
) -> Result<Response<BodyType>, Error> { |
|
|
|
let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; |
|
|
|
let version_uuid = |
|
|
|
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; |
|
|
|
|
|
|
|
let object = garage.object_table.get(&bucket.to_string(), &key.to_string()).await?; |
|
|
|
let object = garage |
|
|
|
.object_table |
|
|
|
.get(&bucket.to_string(), &key.to_string()) |
|
|
|
.await?; |
|
|
|
let object = match object { |
|
|
|
None => return Err(Error::BadRequest(format!("Object not found"))), |
|
|
|
Some(x) => x, |
|
|
@ -352,9 +398,11 @@ pub async fn handle_abort_multipart_upload( |
|
|
|
&& v.data == ObjectVersionData::Uploading |
|
|
|
}); |
|
|
|
let mut object_version = match object_version { |
|
|
|
None => return Err(Error::BadRequest(format!( |
|
|
|
"Multipart upload does not exist or has already been completed" |
|
|
|
))), |
|
|
|
None => { |
|
|
|
return Err(Error::BadRequest(format!( |
|
|
|
"Multipart upload does not exist or has already been completed" |
|
|
|
))) |
|
|
|
} |
|
|
|
Some(x) => x.clone(), |
|
|
|
}; |
|
|
|
|
|
|
@ -383,50 +431,3 @@ fn uuid_from_str(id: &str) -> Result<UUID, ()> { |
|
|
|
uuid.copy_from_slice(&id_bin[..]); |
|
|
|
Ok(UUID::from(uuid)) |
|
|
|
} |
|
|
|
|
|
|
|
pub async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<UUID, Error> { |
|
|
|
let object = match garage |
|
|
|
.object_table |
|
|
|
.get(&bucket.to_string(), &key.to_string()) |
|
|
|
.await? |
|
|
|
{ |
|
|
|
None => { |
|
|
|
// No need to delete
|
|
|
|
return Ok([0u8; 32].into()); |
|
|
|
} |
|
|
|
Some(o) => o, |
|
|
|
}; |
|
|
|
|
|
|
|
let interesting_versions = object.versions().iter().filter(|v| { |
|
|
|
v.data != ObjectVersionData::DeleteMarker && v.state != ObjectVersionState::Aborted |
|
|
|
}); |
|
|
|
|
|
|
|
let mut must_delete = false; |
|
|
|
let mut timestamp = now_msec(); |
|
|
|
for v in interesting_versions { |
|
|
|
must_delete = true; |
|
|
|
timestamp = std::cmp::max(timestamp, v.timestamp + 1); |
|
|
|
} |
|
|
|
|
|
|
|
if !must_delete { |
|
|
|
return Ok([0u8; 32].into()); |
|
|
|
} |
|
|
|
|
|
|
|
let version_uuid = gen_uuid(); |
|
|
|
|
|
|
|
let object = Object::new( |
|
|
|
bucket.into(), |
|
|
|
key.into(), |
|
|
|
vec![ObjectVersion { |
|
|
|
uuid: version_uuid, |
|
|
|
timestamp: now_msec(), |
|
|
|
mime_type: "application/x-delete-marker".into(), |
|
|
|
size: 0, |
|
|
|
state: ObjectVersionState::Complete, |
|
|
|
data: ObjectVersionData::DeleteMarker, |
|
|
|
}], |
|
|
|
); |
|
|
|
|
|
|
|
garage.object_table.insert(&object).await?; |
|
|
|
return Ok(version_uuid); |
|
|
|
} |
|
|
|