Reorganize code in s3_put.rs

This commit is contained in:
Alex 2021-04-07 10:00:47 +02:00
parent 24d605fb05
commit d08778dea4
No known key found for this signature in database
GPG key ID: EDABF9711E244EB1

View file

@ -23,6 +23,8 @@ use crate::encoding::*;
use crate::error::*; use crate::error::*;
use crate::signature::verify_signed_content; use crate::signature::verify_signed_content;
// ---- PutObject call ----
pub async fn handle_put( pub async fn handle_put(
garage: Arc<Garage>, garage: Arc<Garage>,
req: Request<Body>, req: Request<Body>,
@ -151,174 +153,6 @@ pub async fn handle_put(
Ok(put_response(version_uuid, md5sum_hex)) Ok(put_response(version_uuid, md5sum_hex))
} }
/// Validate MD5 sum against content-md5 header
/// and sha256sum against signed content-sha256
fn ensure_checksum_matches(
data_md5sum: &[u8],
data_sha256sum: garage_util::data::FixedBytes32,
content_md5: Option<&str>,
content_sha256: Option<garage_util::data::FixedBytes32>,
) -> Result<(), Error> {
if let Some(expected_sha256) = content_sha256 {
if expected_sha256 != data_sha256sum {
return Err(Error::BadRequest(format!(
"Unable to validate x-amz-content-sha256"
)));
} else {
trace!("Successfully validated x-amz-content-sha256");
}
}
if let Some(expected_md5) = content_md5 {
if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
} else {
trace!("Successfully validated content-md5");
}
}
Ok(())
}
async fn read_and_put_blocks(
garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
chunker: &mut BodyChunker,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
md5hasher.update(&first_block[..]);
sha256hasher.update(&first_block[..]);
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
0,
first_block_hash,
first_block.len() as u64,
);
let mut put_curr_block = garage
.block_manager
.rpc_put_block(first_block_hash, first_block);
loop {
let (_, _, next_block) =
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
sha256hasher.update(&block[..]);
let block_hash = blake2sum(&block[..]);
let block_len = block.len();
put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
next_offset as u64,
block_hash,
block_len as u64,
);
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
next_offset += block_len;
} else {
break;
}
}
let total_size = next_offset as u64;
let data_md5sum = md5hasher.finalize();
let data_sha256sum = sha256hasher.finalize();
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
Ok((total_size, data_md5sum, data_sha256sum))
}
async fn put_block_meta(
garage: &Garage,
version: &Version,
part_number: u64,
offset: u64,
hash: Hash,
size: u64,
) -> Result<(), GarageError> {
let mut version = version.clone();
version.blocks.put(
VersionBlockKey {
part_number,
offset,
},
VersionBlock { hash, size },
);
let block_ref = BlockRef {
block: hash,
version: version.uuid,
deleted: false.into(),
};
futures::try_join!(
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
)?;
Ok(())
}
struct BodyChunker {
body: Body,
read_all: bool,
min_block_size: usize,
avg_block_size: usize,
max_block_size: usize,
buf: VecDeque<u8>,
}
impl BodyChunker {
fn new(body: Body, block_size: usize) -> Self {
let min_block_size = block_size / 4 * 3;
let avg_block_size = block_size;
let max_block_size = block_size * 2;
Self {
body,
read_all: false,
min_block_size,
avg_block_size,
max_block_size,
buf: VecDeque::with_capacity(2 * max_block_size),
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
while !self.read_all && self.buf.len() < self.max_block_size {
if let Some(block) = self.body.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
self.buf.extend(&bytes[..]);
} else {
self.read_all = true;
}
}
if self.buf.len() == 0 {
Ok(None)
} else {
let mut iter = FastCDC::with_eof(
self.buf.make_contiguous(),
self.min_block_size,
self.avg_block_size,
self.max_block_size,
self.read_all,
);
if let Some(Chunk { length, .. }) = iter.next() {
let block = self.buf.drain(..length).collect::<Vec<u8>>();
Ok(Some(block))
} else {
unreachable!("FastCDC returned not chunk")
}
}
}
}
pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> { pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
Response::builder() Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid)) .header("x-amz-version-id", hex::encode(version_uuid))
@ -327,6 +161,8 @@ pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
.unwrap() .unwrap()
} }
// ---- Mutlipart upload calls ----
pub async fn handle_create_multipart_upload( pub async fn handle_create_multipart_upload(
garage: Arc<Garage>, garage: Arc<Garage>,
req: &Request<Body>, req: &Request<Body>,
@ -591,59 +427,7 @@ pub async fn handle_abort_multipart_upload(
Ok(Response::new(Body::from(vec![]))) Ok(Response::new(Body::from(vec![])))
} }
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> { // ---- Parsing input to multipart upload calls ----
Ok(req
.headers()
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
.to_string())
}
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(req)?;
let mut other = BTreeMap::new();
// Preserve standard headers
let standard_header = vec![
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
match v.to_str() {
Ok(v_str) => {
other.insert(h.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", h, e);
}
}
}
}
// Preserve x-amz-meta- headers
for (k, v) in req.headers().iter() {
if k.as_str().starts_with("x-amz-meta-") {
match v.to_str() {
Ok(v_str) => {
other.insert(k.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", k, e);
}
}
}
}
Ok(ObjectVersionHeaders {
content_type,
other,
})
}
fn decode_upload_id(id: &str) -> Result<UUID, Error> { fn decode_upload_id(id: &str) -> Result<UUID, Error> {
let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?; let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?;
@ -690,3 +474,224 @@ fn parse_complete_multpart_upload_body(
Some(parts) Some(parts)
} }
// ---- Common code ----
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = req
.headers()
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
.to_string();
let mut other = BTreeMap::new();
// Preserve standard headers
let standard_header = vec![
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
match v.to_str() {
Ok(v_str) => {
other.insert(h.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", h, e);
}
}
}
}
// Preserve x-amz-meta- headers
for (k, v) in req.headers().iter() {
if k.as_str().starts_with("x-amz-meta-") {
match v.to_str() {
Ok(v_str) => {
other.insert(k.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", k, e);
}
}
}
}
Ok(ObjectVersionHeaders {
content_type,
other,
})
}
struct BodyChunker {
body: Body,
read_all: bool,
min_block_size: usize,
avg_block_size: usize,
max_block_size: usize,
buf: VecDeque<u8>,
}
impl BodyChunker {
fn new(body: Body, block_size: usize) -> Self {
let min_block_size = block_size / 4 * 3;
let avg_block_size = block_size;
let max_block_size = block_size * 2;
Self {
body,
read_all: false,
min_block_size,
avg_block_size,
max_block_size,
buf: VecDeque::with_capacity(2 * max_block_size),
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
while !self.read_all && self.buf.len() < self.max_block_size {
if let Some(block) = self.body.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
self.buf.extend(&bytes[..]);
} else {
self.read_all = true;
}
}
if self.buf.len() == 0 {
Ok(None)
} else {
let mut iter = FastCDC::with_eof(
self.buf.make_contiguous(),
self.min_block_size,
self.avg_block_size,
self.max_block_size,
self.read_all,
);
if let Some(Chunk { length, .. }) = iter.next() {
let block = self.buf.drain(..length).collect::<Vec<u8>>();
Ok(Some(block))
} else {
unreachable!("FastCDC returned not chunk")
}
}
}
}
async fn read_and_put_blocks(
garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
chunker: &mut BodyChunker,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
md5hasher.update(&first_block[..]);
sha256hasher.update(&first_block[..]);
let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
0,
first_block_hash,
first_block.len() as u64,
);
let mut put_curr_block = garage
.block_manager
.rpc_put_block(first_block_hash, first_block);
loop {
let (_, _, next_block) =
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
sha256hasher.update(&block[..]);
let block_hash = blake2sum(&block[..]);
let block_len = block.len();
put_curr_version_block = put_block_meta(
&garage,
&version,
part_number,
next_offset as u64,
block_hash,
block_len as u64,
);
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
next_offset += block_len;
} else {
break;
}
}
let total_size = next_offset as u64;
let data_md5sum = md5hasher.finalize();
let data_sha256sum = sha256hasher.finalize();
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
Ok((total_size, data_md5sum, data_sha256sum))
}
async fn put_block_meta(
garage: &Garage,
version: &Version,
part_number: u64,
offset: u64,
hash: Hash,
size: u64,
) -> Result<(), GarageError> {
let mut version = version.clone();
version.blocks.put(
VersionBlockKey {
part_number,
offset,
},
VersionBlock { hash, size },
);
let block_ref = BlockRef {
block: hash,
version: version.uuid,
deleted: false.into(),
};
futures::try_join!(
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
)?;
Ok(())
}
/// Validate MD5 sum against content-md5 header
/// and sha256sum against signed content-sha256
fn ensure_checksum_matches(
data_md5sum: &[u8],
data_sha256sum: garage_util::data::FixedBytes32,
content_md5: Option<&str>,
content_sha256: Option<garage_util::data::FixedBytes32>,
) -> Result<(), Error> {
if let Some(expected_sha256) = content_sha256 {
if expected_sha256 != data_sha256sum {
return Err(Error::BadRequest(format!(
"Unable to validate x-amz-content-sha256"
)));
} else {
trace!("Successfully validated x-amz-content-sha256");
}
}
if let Some(expected_md5) = content_md5 {
if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
return Err(Error::BadRequest(format!("Unable to validate content-md5")));
} else {
trace!("Successfully validated content-md5");
}
}
Ok(())
}