diff --git a/src/api/common/signature/body.rs b/src/api/common/signature/body.rs index 877d8d85..d8c15ee5 100644 --- a/src/api/common/signature/body.rs +++ b/src/api/common/signature/body.rs @@ -5,7 +5,13 @@ use futures::stream::BoxStream; use http_body_util::{BodyExt, StreamBody}; use hyper::body::{Bytes, Frame}; use serde::Deserialize; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; +use tokio::task; + +use opentelemetry::{ + trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, + Context, +}; use super::*; @@ -13,14 +19,33 @@ use crate::signature::checksum::*; pub struct ReqBody { // why need mutex to be sync?? - pub stream: Mutex, Error>>>, - pub checksummer: Checksummer, - pub expected_checksums: ExpectedChecksums, + pub(crate) stream: Mutex, Error>>>, + pub(crate) checksummer: Checksummer, + pub(crate) expected_checksums: ExpectedChecksums, } -pub type StreamingChecksumReceiver = oneshot::Receiver>; +pub type StreamingChecksumReceiver = task::JoinHandle>; impl ReqBody { + pub fn add_expected_checksums(&mut self, more: ExpectedChecksums) { + if more.md5.is_some() { + self.expected_checksums.md5 = more.md5; + } + if more.sha256.is_some() { + self.expected_checksums.sha256 = more.sha256; + } + if more.extra.is_some() { + self.expected_checksums.extra = more.extra; + } + self.checksummer.add_expected(&self.expected_checksums); + } + + pub fn add_md5(&mut self) { + self.checksummer.add_md5(); + } + + // ============ non-streaming ============= + pub async fn json Deserialize<'a>>(self) -> Result { let body = self.collect().await?; let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; @@ -42,28 +67,71 @@ impl ReqBody { Ok((bytes, checksums)) } - pub fn streaming(self) -> impl Stream> { - self.streaming_with_checksums(false).0 - } + // ============ streaming ============= pub fn streaming_with_checksums( self, - add_md5: bool, ) -> ( - impl Stream>, + BoxStream<'static, Result>, StreamingChecksumReceiver, ) { - let (tx, rx) = oneshot::channel(); - // TODO: actually calculate checksums!! - let stream: BoxStream<_> = self.stream.into_inner().unwrap(); - ( - stream.map(|x| { - x.and_then(|f| { - f.into_data() - .map_err(|_| Error::bad_request("non-data frame")) - }) - }), - rx, - ) + let Self { + stream, + mut checksummer, + mut expected_checksums, + } = self; + + let (frame_tx, mut frame_rx) = mpsc::channel::>(1); + + let join_checksums = tokio::spawn(async move { + let tracer = opentelemetry::global::tracer("garage"); + + while let Some(frame) = frame_rx.recv().await { + match frame.into_data() { + Ok(data) => { + checksummer = tokio::task::spawn_blocking(move || { + checksummer.update(&data); + checksummer + }) + .await + .unwrap() + } + Err(frame) => { + let trailers = frame.into_trailers().unwrap(); + if let Some(cv) = request_checksum_value(&trailers)? { + expected_checksums.extra = Some(cv); + } + break; + } + } + } + + let checksums = checksummer.finalize(); + checksums.verify(&expected_checksums)?; + + return Ok(checksums); + }); + + let stream: BoxStream<_> = stream.into_inner().unwrap(); + let stream = stream.filter_map(move |x| { + let frame_tx = frame_tx.clone(); + async move { + match x { + Err(e) => Some(Err(e)), + Ok(frame) => { + if frame.is_data() { + let data = frame.data_ref().unwrap().clone(); + let _ = frame_tx.send(frame).await; + Some(Ok(data)) + } else { + let _ = frame_tx.send(frame).await; + None + } + } + } + } + }); + + (stream.boxed(), join_checksums) } } diff --git a/src/api/common/signature/checksum.rs b/src/api/common/signature/checksum.rs index b184fc65..a9f00423 100644 --- a/src/api/common/signature/checksum.rs +++ b/src/api/common/signature/checksum.rs @@ -32,7 +32,7 @@ pub type Md5Checksum = [u8; 16]; pub type Sha1Checksum = [u8; 20]; pub type Sha256Checksum = [u8; 32]; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct ExpectedChecksums { // base64-encoded md5 (content-md5 header) pub md5: Option, @@ -70,27 +70,37 @@ impl Checksummer { } } - pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { + pub fn init(expected: &ExpectedChecksums, add_md5: bool) -> Self { let mut ret = Self::new(); - - if expected.md5.is_some() || require_md5 { - ret.md5 = Some(Md5::new()); - } - if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) { - ret.sha256 = Some(Sha256::new()); - } - if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) { - ret.crc32 = Some(Crc32::new()); - } - if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) { - ret.crc32c = Some(Crc32c::default()); - } - if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) { - ret.sha1 = Some(Sha1::new()); + ret.add_expected(expected); + if add_md5 { + ret.add_md5(); } ret } + pub fn add_md5(&mut self) { + self.md5 = Some(Md5::new()); + } + + pub fn add_expected(&mut self, expected: &ExpectedChecksums) { + if expected.md5.is_some() { + self.md5 = Some(Md5::new()); + } + if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) { + self.sha256 = Some(Sha256::new()); + } + if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) { + self.crc32 = Some(Crc32::new()); + } + if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) { + self.crc32c = Some(Crc32c::default()); + } + if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) { + self.sha1 = Some(Sha1::new()); + } + } + pub fn add(mut self, algo: Option) -> Self { match algo { Some(ChecksumAlgorithm::Crc32) => { diff --git a/src/api/common/signature/streaming.rs b/src/api/common/signature/streaming.rs index e8f9b3d7..3ffc5b2f 100644 --- a/src/api/common/signature/streaming.rs +++ b/src/api/common/signature/streaming.rs @@ -25,15 +25,11 @@ pub fn parse_streaming_body( service: &str, ) -> Result, Error> { let expected_checksums = ExpectedChecksums { - md5: match req.headers().get("content-md5") { - Some(x) => Some(x.to_str()?.to_string()), - None => None, - }, sha256: match &checked_signature.content_sha256_header { ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256), _ => None, }, - extra: None, + ..Default::default() }; let mut checksummer = Checksummer::init(&expected_checksums, false); diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index fe6545cc..e26c2b65 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -15,7 +15,7 @@ use garage_model::key_table::Key; use garage_api_common::cors::*; use garage_api_common::generic_server::*; use garage_api_common::helpers::*; -use garage_api_common::signature::{verify_request, ContentSha256Header}; +use garage_api_common::signature::verify_request; use crate::bucket::*; use crate::copy::*; @@ -124,11 +124,6 @@ impl ApiHandler for S3ApiServer { let verified_request = verify_request(&garage, req, "s3").await?; let req = verified_request.request; let api_key = verified_request.access_key; - let content_sha256 = match verified_request.content_sha256_header { - ContentSha256Header::Sha256Checksum(h) => Some(h), - // TODO take into account streaming/trailer checksums, etc. - _ => None, - }; let bucket_name = match bucket_name { None => { @@ -205,14 +200,14 @@ impl ApiHandler for S3ApiServer { key, part_number, upload_id, - } => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await, + } => handle_put_part(ctx, req, &key, part_number, &upload_id).await, Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await, Endpoint::UploadPartCopy { key, part_number, upload_id, } => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await, - Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await, + Endpoint::PutObject { key } => handle_put(ctx, req, &key).await, Endpoint::AbortMultipartUpload { key, upload_id } => { handle_abort_multipart_upload(ctx, &key, &upload_id).await } diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index f381d670..59a469d1 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -94,7 +94,6 @@ pub async fn handle_put_part( key: &str, part_number: u64, upload_id: &str, - content_sha256: Option, ) -> Result, Error> { let ReqCtx { garage, .. } = &ctx; @@ -105,18 +104,23 @@ pub async fn handle_put_part( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let (req_head, req_body) = req.into_parts(); + let (req_head, mut req_body) = req.into_parts(); - let (stream, checksums) = req_body.streaming_with_checksums(true); + req_body.add_expected_checksums(expected_checksums.clone()); + // TODO: avoid parsing encryption headers twice... + if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() { + req_body.add_md5(); + } + + let (stream, stream_checksums) = req_body.streaming_with_checksums(); let stream = stream.map_err(Error::from); - // TODO checksums let mut chunker = StreamChunker::new(stream, garage.config.block_size); @@ -176,21 +180,22 @@ pub async fn handle_put_part( garage.version_table.insert(&version).await?; // Copy data to version - let checksummer = - Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm); - let (total_size, checksums, _) = read_and_put_blocks( + // TODO don't duplicate checksums + let (total_size, _, _) = read_and_put_blocks( &ctx, &version, encryption, part_number, first_block, - &mut chunker, - checksummer, + chunker, + Checksummer::new(), ) .await?; // Verify that checksums map - checksums.verify(&expected_checksums)?; + let checksums = stream_checksums + .await + .ok_or_internal_error("checksum calculation")??; // Store part etag in version let etag = encryption.etag_from_md5(&checksums.md5); diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 551c3b76..24f888bc 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -31,6 +31,7 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_api_common::helpers::*; +use garage_api_common::signature::body::StreamingChecksumReceiver; use garage_api_common::signature::checksum::*; use crate::api_server::{ReqBody, ResBody}; @@ -49,6 +50,7 @@ pub(crate) struct SaveStreamResult { pub(crate) enum ChecksumMode<'a> { Verify(&'a ExpectedChecksums), + VerifyFrom(StreamingChecksumReceiver), Calculate(Option), } @@ -56,7 +58,6 @@ pub async fn handle_put( ctx: ReqCtx, req: Request, key: &String, - content_sha256: Option, ) -> Result, Error> { // Retrieve interesting headers from request let headers = get_headers(req.headers())?; @@ -67,7 +68,7 @@ pub async fn handle_put( Some(x) => Some(x.to_str()?.to_string()), None => None, }, - sha256: content_sha256, + sha256: None, extra: request_checksum_value(req.headers())?, }; @@ -79,9 +80,14 @@ pub async fn handle_put( // Determine whether object should be encrypted, and if so the key let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; - let (stream, checksums) = req.into_body().streaming_with_checksums(true); + let mut req_body = req.into_body(); + req_body.add_expected_checksums(expected_checksums.clone()); + if !encryption.is_encrypted() { + req_body.add_md5(); + } + + let (stream, checksums) = req_body.streaming_with_checksums(); let stream = stream.map_err(Error::from); - // TODO checksums let res = save_stream( &ctx, @@ -89,7 +95,7 @@ pub async fn handle_put( encryption, stream, key, - ChecksumMode::Verify(&expected_checksums), + ChecksumMode::VerifyFrom(checksums), ) .await?; @@ -125,10 +131,15 @@ pub(crate) async fn save_stream> + Unpin>( let version_uuid = gen_uuid(); let version_timestamp = next_timestamp(existing_object.as_ref()); - let mut checksummer = match checksum_mode { + let mut checksummer = match &checksum_mode { ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()), ChecksumMode::Calculate(algo) => { - Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo) + Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(*algo) + } + ChecksumMode::VerifyFrom(_) => { + // Checksums are calculated by the garage_api_common::signature module + // so here we can just have an empty checksummer that does nothing + Checksummer::new() } }; @@ -136,7 +147,7 @@ pub(crate) async fn save_stream> + Unpin>( // as "inline data". We can then return immediately. if first_block.len() < INLINE_THRESHOLD { checksummer.update(&first_block); - let checksums = checksummer.finalize(); + let mut checksums = checksummer.finalize(); match checksum_mode { ChecksumMode::Verify(expected) => { @@ -145,6 +156,12 @@ pub(crate) async fn save_stream> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom(checksummer) => { + drop(chunker); + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + } }; let size = first_block.len() as u64; @@ -216,13 +233,13 @@ pub(crate) async fn save_stream> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data - let (total_size, checksums, first_block_hash) = read_and_put_blocks( + let (total_size, mut checksums, first_block_hash) = read_and_put_blocks( ctx, &version, encryption, 1, first_block, - &mut chunker, + chunker, checksummer, ) .await?; @@ -235,6 +252,11 @@ pub(crate) async fn save_stream> + Unpin>( ChecksumMode::Calculate(algo) => { meta.checksum = checksums.extract(algo); } + ChecksumMode::VerifyFrom(checksummer) => { + checksums = checksummer + .await + .ok_or_internal_error("checksum calculation")??; + } }; // Verify quotas are respsected @@ -335,7 +357,7 @@ pub(crate) async fn read_and_put_blocks> + encryption: EncryptionParams, part_number: u64, first_block: Bytes, - chunker: &mut StreamChunker, + mut chunker: StreamChunker, checksummer: Checksummer, ) -> Result<(u64, Checksums, Hash), Error> { let tracer = opentelemetry::global::tracer("garage");