api: various fixes
This commit is contained in:
parent
45e10e55f9
commit
f6e805e7db
6 changed files with 34 additions and 32 deletions
|
@ -78,7 +78,7 @@ impl ReqBody {
|
||||||
trailer_algorithm,
|
trailer_algorithm,
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
let (frame_tx, mut frame_rx) = mpsc::channel::<Frame<Bytes>>(1);
|
let (frame_tx, mut frame_rx) = mpsc::channel::<Frame<Bytes>>(5);
|
||||||
|
|
||||||
let join_checksums = tokio::spawn(async move {
|
let join_checksums = tokio::spawn(async move {
|
||||||
while let Some(frame) = frame_rx.recv().await {
|
while let Some(frame) = frame_rx.recv().await {
|
||||||
|
|
|
@ -62,7 +62,6 @@ pub struct VerifiedRequest {
|
||||||
pub request: Request<streaming::ReqBody>,
|
pub request: Request<streaming::ReqBody>,
|
||||||
pub access_key: Key,
|
pub access_key: Key,
|
||||||
pub content_sha256_header: ContentSha256Header,
|
pub content_sha256_header: ContentSha256Header,
|
||||||
// TODO: oneshot chans to retrieve hashes after reading all body
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn verify_request(
|
pub async fn verify_request(
|
||||||
|
|
|
@ -74,21 +74,16 @@ fn parse_x_amz_content_sha256(header: Option<&str>) -> Result<ContentSha256Heade
|
||||||
} else {
|
} else {
|
||||||
(false, rest)
|
(false, rest)
|
||||||
};
|
};
|
||||||
if algo == AWS4_HMAC_SHA256_PAYLOAD {
|
let signed = match algo {
|
||||||
Ok(ContentSha256Header::StreamingPayload {
|
AWS4_HMAC_SHA256_PAYLOAD => true,
|
||||||
trailer,
|
UNSIGNED_PAYLOAD => false,
|
||||||
signed: true,
|
_ => {
|
||||||
})
|
return Err(Error::bad_request(
|
||||||
} else if algo == UNSIGNED_PAYLOAD {
|
"invalid or unsupported x-amz-content-sha256",
|
||||||
Ok(ContentSha256Header::StreamingPayload {
|
))
|
||||||
trailer,
|
}
|
||||||
signed: false,
|
};
|
||||||
})
|
Ok(ContentSha256Header::StreamingPayload { trailer, signed })
|
||||||
} else {
|
|
||||||
Err(Error::bad_request(
|
|
||||||
"invalid or unsupported x-amz-content-sha256",
|
|
||||||
))
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
let sha256 = hex::decode(header)
|
let sha256 = hex::decode(header)
|
||||||
.ok()
|
.ok()
|
||||||
|
|
|
@ -30,22 +30,12 @@ pub fn parse_streaming_body(
|
||||||
checked_signature.content_sha256_header
|
checked_signature.content_sha256_header
|
||||||
);
|
);
|
||||||
|
|
||||||
let expected_checksums = ExpectedChecksums {
|
|
||||||
sha256: match &checked_signature.content_sha256_header {
|
|
||||||
ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256),
|
|
||||||
_ => None,
|
|
||||||
},
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut checksummer = Checksummer::init(&expected_checksums, false);
|
|
||||||
|
|
||||||
match checked_signature.content_sha256_header {
|
match checked_signature.content_sha256_header {
|
||||||
ContentSha256Header::StreamingPayload { signed, trailer } => {
|
ContentSha256Header::StreamingPayload { signed, trailer } => {
|
||||||
// Sanity checks
|
// Sanity checks
|
||||||
if !signed && !trailer {
|
if !signed && !trailer {
|
||||||
return Err(Error::bad_request(
|
return Err(Error::bad_request(
|
||||||
"STREAMING-UNSIGNED-PAYLOAD is not a valid combination",
|
"STREAMING-UNSIGNED-PAYLOAD without trailer is not a valid combination",
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,6 +54,7 @@ pub fn parse_streaming_body(
|
||||||
}
|
}
|
||||||
|
|
||||||
// If trailer header is announced, add the calculation of the requested checksum
|
// If trailer header is announced, add the calculation of the requested checksum
|
||||||
|
let mut checksummer = Checksummer::init(&Default::default(), false);
|
||||||
let trailer_algorithm = if trailer {
|
let trailer_algorithm = if trailer {
|
||||||
let algo = Some(
|
let algo = Some(
|
||||||
request_trailer_checksum_algorithm(req.headers())?
|
request_trailer_checksum_algorithm(req.headers())?
|
||||||
|
@ -128,12 +119,21 @@ pub fn parse_streaming_body(
|
||||||
ReqBody {
|
ReqBody {
|
||||||
stream: Mutex::new(signed_payload_stream.boxed()),
|
stream: Mutex::new(signed_payload_stream.boxed()),
|
||||||
checksummer,
|
checksummer,
|
||||||
expected_checksums,
|
expected_checksums: Default::default(),
|
||||||
trailer_algorithm,
|
trailer_algorithm,
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
_ => Ok(req.map(|body| {
|
_ => Ok(req.map(|body| {
|
||||||
|
let expected_checksums = ExpectedChecksums {
|
||||||
|
sha256: match &checked_signature.content_sha256_header {
|
||||||
|
ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256),
|
||||||
|
_ => None,
|
||||||
|
},
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let checksummer = Checksummer::init(&expected_checksums, false);
|
||||||
|
|
||||||
let stream = http_body_util::BodyStream::new(body).map_err(Error::from);
|
let stream = http_body_util::BodyStream::new(body).map_err(Error::from);
|
||||||
ReqBody {
|
ReqBody {
|
||||||
stream: Mutex::new(stream.boxed()),
|
stream: Mutex::new(stream.boxed()),
|
||||||
|
|
|
@ -114,14 +114,17 @@ pub async fn handle_put_part(
|
||||||
extra: request_checksum_value(req.headers())?,
|
extra: request_checksum_value(req.headers())?,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Read first chuck, and at the same time try to get object to see if it exists
|
|
||||||
let key = key.to_string();
|
let key = key.to_string();
|
||||||
|
|
||||||
let (req_head, mut req_body) = req.into_parts();
|
let (req_head, mut req_body) = req.into_parts();
|
||||||
|
|
||||||
|
// Before we stream the body, configure the needed checksums.
|
||||||
req_body.add_expected_checksums(expected_checksums.clone());
|
req_body.add_expected_checksums(expected_checksums.clone());
|
||||||
// TODO: avoid parsing encryption headers twice...
|
// TODO: avoid parsing encryption headers twice...
|
||||||
if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() {
|
if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() {
|
||||||
|
// For non-encrypted objects, we need to compute the md5sum in all cases
|
||||||
|
// (even if content-md5 is not set), because it is used as an etag of the
|
||||||
|
// part, which is in turn used in the etag computation of the whole object
|
||||||
req_body.add_md5();
|
req_body.add_md5();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,6 +133,7 @@ pub async fn handle_put_part(
|
||||||
|
|
||||||
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
|
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
|
||||||
|
|
||||||
|
// Read first chuck, and at the same time try to get object to see if it exists
|
||||||
let ((_, object_version, mut mpu), first_block) =
|
let ((_, object_version, mut mpu), first_block) =
|
||||||
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
|
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
|
||||||
|
|
||||||
|
@ -186,7 +190,6 @@ pub async fn handle_put_part(
|
||||||
garage.version_table.insert(&version).await?;
|
garage.version_table.insert(&version).await?;
|
||||||
|
|
||||||
// Copy data to version
|
// Copy data to version
|
||||||
// TODO don't duplicate checksums
|
|
||||||
let (total_size, _, _) = read_and_put_blocks(
|
let (total_size, _, _) = read_and_put_blocks(
|
||||||
&ctx,
|
&ctx,
|
||||||
&version,
|
&version,
|
||||||
|
@ -198,7 +201,7 @@ pub async fn handle_put_part(
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Verify that checksums map
|
// Verify that checksums match
|
||||||
let checksums = stream_checksums
|
let checksums = stream_checksums
|
||||||
.await
|
.await
|
||||||
.ok_or_internal_error("checksum calculation")??;
|
.ok_or_internal_error("checksum calculation")??;
|
||||||
|
|
|
@ -79,9 +79,14 @@ pub async fn handle_put(
|
||||||
// Determine whether object should be encrypted, and if so the key
|
// Determine whether object should be encrypted, and if so the key
|
||||||
let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
|
let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
|
||||||
|
|
||||||
|
// The request body is a special ReqBody object (see garage_api_common::signature::body)
|
||||||
|
// which supports calculating checksums while streaming the data.
|
||||||
|
// Before we start streaming, we configure it to calculate all the checksums we need.
|
||||||
let mut req_body = req.into_body();
|
let mut req_body = req.into_body();
|
||||||
req_body.add_expected_checksums(expected_checksums.clone());
|
req_body.add_expected_checksums(expected_checksums.clone());
|
||||||
if !encryption.is_encrypted() {
|
if !encryption.is_encrypted() {
|
||||||
|
// For non-encrypted objects, we need to compute the md5sum in all cases
|
||||||
|
// (even if content-md5 is not set), because it is used as the object etag
|
||||||
req_body.add_md5();
|
req_body.add_md5();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue