From c3bd672d58d32c8fc3b3225bfc2bfb5330ec726e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Apr 2021 10:00:47 +0200 Subject: [PATCH] Reorganize code in s3_put.rs --- src/api/s3_put.rs | 447 +++++++++++++++++++++++----------------------- 1 file changed, 226 insertions(+), 221 deletions(-) diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index d023bcef..50362b28 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -23,6 +23,8 @@ use crate::encoding::*; use crate::error::*; use crate::signature::verify_signed_content; +// ---- PutObject call ---- + pub async fn handle_put( garage: Arc, req: Request, @@ -151,174 +153,6 @@ pub async fn handle_put( Ok(put_response(version_uuid, md5sum_hex)) } -/// Validate MD5 sum against content-md5 header -/// and sha256sum against signed content-sha256 -fn ensure_checksum_matches( - data_md5sum: &[u8], - data_sha256sum: garage_util::data::FixedBytes32, - content_md5: Option<&str>, - content_sha256: Option, -) -> Result<(), Error> { - if let Some(expected_sha256) = content_sha256 { - if expected_sha256 != data_sha256sum { - return Err(Error::BadRequest(format!( - "Unable to validate x-amz-content-sha256" - ))); - } else { - trace!("Successfully validated x-amz-content-sha256"); - } - } - if let Some(expected_md5) = content_md5 { - if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { - return Err(Error::BadRequest(format!("Unable to validate content-md5"))); - } else { - trace!("Successfully validated content-md5"); - } - } - Ok(()) -} - -async fn read_and_put_blocks( - garage: &Garage, - version: &Version, - part_number: u64, - first_block: Vec, - first_block_hash: Hash, - chunker: &mut BodyChunker, -) -> Result<(u64, GenericArray, Hash), Error> { - let mut md5hasher = Md5::new(); - let mut sha256hasher = Sha256::new(); - md5hasher.update(&first_block[..]); - sha256hasher.update(&first_block[..]); - - let mut next_offset = first_block.len(); - let mut put_curr_version_block = put_block_meta( - &garage, - &version, - part_number, - 0, - first_block_hash, - first_block.len() as u64, - ); - let mut put_curr_block = garage - .block_manager - .rpc_put_block(first_block_hash, first_block); - - loop { - let (_, _, next_block) = - futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; - if let Some(block) = next_block { - md5hasher.update(&block[..]); - sha256hasher.update(&block[..]); - let block_hash = blake2sum(&block[..]); - let block_len = block.len(); - put_curr_version_block = put_block_meta( - &garage, - &version, - part_number, - next_offset as u64, - block_hash, - block_len as u64, - ); - put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); - next_offset += block_len; - } else { - break; - } - } - - let total_size = next_offset as u64; - let data_md5sum = md5hasher.finalize(); - - let data_sha256sum = sha256hasher.finalize(); - let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); - - Ok((total_size, data_md5sum, data_sha256sum)) -} - -async fn put_block_meta( - garage: &Garage, - version: &Version, - part_number: u64, - offset: u64, - hash: Hash, - size: u64, -) -> Result<(), GarageError> { - let mut version = version.clone(); - version.blocks.put( - VersionBlockKey { - part_number, - offset, - }, - VersionBlock { hash, size }, - ); - - let block_ref = BlockRef { - block: hash, - version: version.uuid, - deleted: false.into(), - }; - - futures::try_join!( - garage.version_table.insert(&version), - garage.block_ref_table.insert(&block_ref), - )?; - Ok(()) -} - -struct BodyChunker { - body: Body, - read_all: bool, - min_block_size: usize, - avg_block_size: usize, - max_block_size: usize, - buf: VecDeque, -} - -impl BodyChunker { - fn new(body: Body, block_size: usize) -> Self { - let min_block_size = block_size / 4 * 3; - let avg_block_size = block_size; - let max_block_size = block_size * 2; - Self { - body, - read_all: false, - min_block_size, - avg_block_size, - max_block_size, - buf: VecDeque::with_capacity(2 * max_block_size), - } - } - async fn next(&mut self) -> Result>, GarageError> { - while !self.read_all && self.buf.len() < self.max_block_size { - if let Some(block) = self.body.next().await { - let bytes = block?; - trace!("Body next: {} bytes", bytes.len()); - self.buf.extend(&bytes[..]); - } else { - self.read_all = true; - } - } - if self.buf.len() == 0 { - Ok(None) - } else { - let mut iter = FastCDC::with_eof( - self.buf.make_contiguous(), - self.min_block_size, - self.avg_block_size, - self.max_block_size, - self.read_all, - ); - if let Some(Chunk { length, .. }) = iter.next() { - let block = self.buf.drain(..length).collect::>(); - Ok(Some(block)) - } else { - unreachable!("FastCDC returned not chunk") - } - } - } -} - pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) @@ -327,6 +161,8 @@ pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response { .unwrap() } +// ---- Mutlipart upload calls ---- + pub async fn handle_create_multipart_upload( garage: Arc, req: &Request, @@ -591,59 +427,7 @@ pub async fn handle_abort_multipart_upload( Ok(Response::new(Body::from(vec![]))) } -fn get_mime_type(req: &Request) -> Result { - Ok(req - .headers() - .get(hyper::header::CONTENT_TYPE) - .map(|x| x.to_str()) - .unwrap_or(Ok("blob"))? - .to_string()) -} - -pub(crate) fn get_headers(req: &Request) -> Result { - let content_type = get_mime_type(req)?; - let mut other = BTreeMap::new(); - - // Preserve standard headers - let standard_header = vec![ - hyper::header::CACHE_CONTROL, - hyper::header::CONTENT_DISPOSITION, - hyper::header::CONTENT_ENCODING, - hyper::header::CONTENT_LANGUAGE, - hyper::header::EXPIRES, - ]; - for h in standard_header.iter() { - if let Some(v) = req.headers().get(h) { - match v.to_str() { - Ok(v_str) => { - other.insert(h.to_string(), v_str.to_string()); - } - Err(e) => { - warn!("Discarding header {}, error in .to_str(): {}", h, e); - } - } - } - } - - // Preserve x-amz-meta- headers - for (k, v) in req.headers().iter() { - if k.as_str().starts_with("x-amz-meta-") { - match v.to_str() { - Ok(v_str) => { - other.insert(k.to_string(), v_str.to_string()); - } - Err(e) => { - warn!("Discarding header {}, error in .to_str(): {}", k, e); - } - } - } - } - - Ok(ObjectVersionHeaders { - content_type, - other, - }) -} +// ---- Parsing input to multipart upload calls ---- fn decode_upload_id(id: &str) -> Result { let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?; @@ -690,3 +474,224 @@ fn parse_complete_multpart_upload_body( Some(parts) } + +// ---- Common code ---- + +pub(crate) fn get_headers(req: &Request) -> Result { + let content_type = req + .headers() + .get(hyper::header::CONTENT_TYPE) + .map(|x| x.to_str()) + .unwrap_or(Ok("blob"))? + .to_string(); + + let mut other = BTreeMap::new(); + + // Preserve standard headers + let standard_header = vec![ + hyper::header::CACHE_CONTROL, + hyper::header::CONTENT_DISPOSITION, + hyper::header::CONTENT_ENCODING, + hyper::header::CONTENT_LANGUAGE, + hyper::header::EXPIRES, + ]; + for h in standard_header.iter() { + if let Some(v) = req.headers().get(h) { + match v.to_str() { + Ok(v_str) => { + other.insert(h.to_string(), v_str.to_string()); + } + Err(e) => { + warn!("Discarding header {}, error in .to_str(): {}", h, e); + } + } + } + } + + // Preserve x-amz-meta- headers + for (k, v) in req.headers().iter() { + if k.as_str().starts_with("x-amz-meta-") { + match v.to_str() { + Ok(v_str) => { + other.insert(k.to_string(), v_str.to_string()); + } + Err(e) => { + warn!("Discarding header {}, error in .to_str(): {}", k, e); + } + } + } + } + + Ok(ObjectVersionHeaders { + content_type, + other, + }) +} + +struct BodyChunker { + body: Body, + read_all: bool, + min_block_size: usize, + avg_block_size: usize, + max_block_size: usize, + buf: VecDeque, +} + +impl BodyChunker { + fn new(body: Body, block_size: usize) -> Self { + let min_block_size = block_size / 4 * 3; + let avg_block_size = block_size; + let max_block_size = block_size * 2; + Self { + body, + read_all: false, + min_block_size, + avg_block_size, + max_block_size, + buf: VecDeque::with_capacity(2 * max_block_size), + } + } + async fn next(&mut self) -> Result>, GarageError> { + while !self.read_all && self.buf.len() < self.max_block_size { + if let Some(block) = self.body.next().await { + let bytes = block?; + trace!("Body next: {} bytes", bytes.len()); + self.buf.extend(&bytes[..]); + } else { + self.read_all = true; + } + } + if self.buf.len() == 0 { + Ok(None) + } else { + let mut iter = FastCDC::with_eof( + self.buf.make_contiguous(), + self.min_block_size, + self.avg_block_size, + self.max_block_size, + self.read_all, + ); + if let Some(Chunk { length, .. }) = iter.next() { + let block = self.buf.drain(..length).collect::>(); + Ok(Some(block)) + } else { + unreachable!("FastCDC returned not chunk") + } + } + } +} + +async fn read_and_put_blocks( + garage: &Garage, + version: &Version, + part_number: u64, + first_block: Vec, + first_block_hash: Hash, + chunker: &mut BodyChunker, +) -> Result<(u64, GenericArray, Hash), Error> { + let mut md5hasher = Md5::new(); + let mut sha256hasher = Sha256::new(); + md5hasher.update(&first_block[..]); + sha256hasher.update(&first_block[..]); + + let mut next_offset = first_block.len(); + let mut put_curr_version_block = put_block_meta( + &garage, + &version, + part_number, + 0, + first_block_hash, + first_block.len() as u64, + ); + let mut put_curr_block = garage + .block_manager + .rpc_put_block(first_block_hash, first_block); + + loop { + let (_, _, next_block) = + futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + if let Some(block) = next_block { + md5hasher.update(&block[..]); + sha256hasher.update(&block[..]); + let block_hash = blake2sum(&block[..]); + let block_len = block.len(); + put_curr_version_block = put_block_meta( + &garage, + &version, + part_number, + next_offset as u64, + block_hash, + block_len as u64, + ); + put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); + next_offset += block_len; + } else { + break; + } + } + + let total_size = next_offset as u64; + let data_md5sum = md5hasher.finalize(); + + let data_sha256sum = sha256hasher.finalize(); + let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); + + Ok((total_size, data_md5sum, data_sha256sum)) +} + +async fn put_block_meta( + garage: &Garage, + version: &Version, + part_number: u64, + offset: u64, + hash: Hash, + size: u64, +) -> Result<(), GarageError> { + let mut version = version.clone(); + version.blocks.put( + VersionBlockKey { + part_number, + offset, + }, + VersionBlock { hash, size }, + ); + + let block_ref = BlockRef { + block: hash, + version: version.uuid, + deleted: false.into(), + }; + + futures::try_join!( + garage.version_table.insert(&version), + garage.block_ref_table.insert(&block_ref), + )?; + Ok(()) +} + +/// Validate MD5 sum against content-md5 header +/// and sha256sum against signed content-sha256 +fn ensure_checksum_matches( + data_md5sum: &[u8], + data_sha256sum: garage_util::data::FixedBytes32, + content_md5: Option<&str>, + content_sha256: Option, +) -> Result<(), Error> { + if let Some(expected_sha256) = content_sha256 { + if expected_sha256 != data_sha256sum { + return Err(Error::BadRequest(format!( + "Unable to validate x-amz-content-sha256" + ))); + } else { + trace!("Successfully validated x-amz-content-sha256"); + } + } + if let Some(expected_md5) = content_md5 { + if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { + return Err(Error::BadRequest(format!("Unable to validate content-md5"))); + } else { + trace!("Successfully validated content-md5"); + } + } + Ok(()) +}