implement STREAMING-*-PAYLOAD-TRAILER #960

Merged
lx merged 15 commits from fix-824 into main 2025-02-19 09:59:32 +00:00
6 changed files with 170 additions and 74 deletions
Showing only changes of commit 658541d812 - Show all commits

View file

@ -5,7 +5,13 @@ use futures::stream::BoxStream;
use http_body_util::{BodyExt, StreamBody}; use http_body_util::{BodyExt, StreamBody};
use hyper::body::{Bytes, Frame}; use hyper::body::{Bytes, Frame};
use serde::Deserialize; use serde::Deserialize;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::mpsc;
use tokio::task;
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context,
};
use super::*; use super::*;
@ -13,14 +19,33 @@ use crate::signature::checksum::*;
pub struct ReqBody { pub struct ReqBody {
// why need mutex to be sync?? // why need mutex to be sync??
pub stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>, pub(crate) stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>,
pub checksummer: Checksummer, pub(crate) checksummer: Checksummer,
pub expected_checksums: ExpectedChecksums, pub(crate) expected_checksums: ExpectedChecksums,
} }
pub type StreamingChecksumReceiver = oneshot::Receiver<Result<Checksums, Error>>; pub type StreamingChecksumReceiver = task::JoinHandle<Result<Checksums, Error>>;
impl ReqBody { impl ReqBody {
pub fn add_expected_checksums(&mut self, more: ExpectedChecksums) {
if more.md5.is_some() {
self.expected_checksums.md5 = more.md5;
}
if more.sha256.is_some() {
self.expected_checksums.sha256 = more.sha256;
}
if more.extra.is_some() {
self.expected_checksums.extra = more.extra;
}
self.checksummer.add_expected(&self.expected_checksums);
}
pub fn add_md5(&mut self) {
self.checksummer.add_md5();
}
// ============ non-streaming =============
pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> { pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> {
let body = self.collect().await?; let body = self.collect().await?;
let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
@ -42,28 +67,71 @@ impl ReqBody {
Ok((bytes, checksums)) Ok((bytes, checksums))
} }
pub fn streaming(self) -> impl Stream<Item = Result<Bytes, Error>> { // ============ streaming =============
self.streaming_with_checksums(false).0
}
pub fn streaming_with_checksums( pub fn streaming_with_checksums(
self, self,
add_md5: bool,
) -> ( ) -> (
impl Stream<Item = Result<Bytes, Error>>, BoxStream<'static, Result<Bytes, Error>>,
StreamingChecksumReceiver, StreamingChecksumReceiver,
) { ) {
let (tx, rx) = oneshot::channel(); let Self {
// TODO: actually calculate checksums!! stream,
let stream: BoxStream<_> = self.stream.into_inner().unwrap(); mut checksummer,
( mut expected_checksums,
stream.map(|x| { } = self;
x.and_then(|f| {
f.into_data() let (frame_tx, mut frame_rx) = mpsc::channel::<Frame<Bytes>>(1);
.map_err(|_| Error::bad_request("non-data frame"))
}) let join_checksums = tokio::spawn(async move {
}), let tracer = opentelemetry::global::tracer("garage");
rx,
) while let Some(frame) = frame_rx.recv().await {
match frame.into_data() {
Ok(data) => {
checksummer = tokio::task::spawn_blocking(move || {
checksummer.update(&data);
checksummer
})
.await
.unwrap()
}
Err(frame) => {
let trailers = frame.into_trailers().unwrap();
if let Some(cv) = request_checksum_value(&trailers)? {
expected_checksums.extra = Some(cv);
}
break;
}
}
}
let checksums = checksummer.finalize();
checksums.verify(&expected_checksums)?;
return Ok(checksums);
});
let stream: BoxStream<_> = stream.into_inner().unwrap();
let stream = stream.filter_map(move |x| {
let frame_tx = frame_tx.clone();
async move {
match x {
Err(e) => Some(Err(e)),
Ok(frame) => {
if frame.is_data() {
let data = frame.data_ref().unwrap().clone();
let _ = frame_tx.send(frame).await;
Some(Ok(data))
} else {
let _ = frame_tx.send(frame).await;
None
}
}
}
}
});
(stream.boxed(), join_checksums)
} }
} }

View file

@ -32,7 +32,7 @@ pub type Md5Checksum = [u8; 16];
pub type Sha1Checksum = [u8; 20]; pub type Sha1Checksum = [u8; 20];
pub type Sha256Checksum = [u8; 32]; pub type Sha256Checksum = [u8; 32];
#[derive(Debug, Default)] #[derive(Debug, Default, Clone)]
pub struct ExpectedChecksums { pub struct ExpectedChecksums {
// base64-encoded md5 (content-md5 header) // base64-encoded md5 (content-md5 header)
pub md5: Option<String>, pub md5: Option<String>,
@ -70,27 +70,37 @@ impl Checksummer {
} }
} }
pub fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self { pub fn init(expected: &ExpectedChecksums, add_md5: bool) -> Self {
let mut ret = Self::new(); let mut ret = Self::new();
ret.add_expected(expected);
if expected.md5.is_some() || require_md5 { if add_md5 {
ret.md5 = Some(Md5::new()); ret.add_md5();
}
if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) {
ret.sha256 = Some(Sha256::new());
}
if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) {
ret.crc32 = Some(Crc32::new());
}
if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) {
ret.crc32c = Some(Crc32c::default());
}
if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) {
ret.sha1 = Some(Sha1::new());
} }
ret ret
} }
pub fn add_md5(&mut self) {
self.md5 = Some(Md5::new());
}
pub fn add_expected(&mut self, expected: &ExpectedChecksums) {
if expected.md5.is_some() {
self.md5 = Some(Md5::new());
}
if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) {
self.sha256 = Some(Sha256::new());
}
if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) {
self.crc32 = Some(Crc32::new());
}
if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) {
self.crc32c = Some(Crc32c::default());
}
if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) {
self.sha1 = Some(Sha1::new());
}
}
pub fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self { pub fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self {
match algo { match algo {
Some(ChecksumAlgorithm::Crc32) => { Some(ChecksumAlgorithm::Crc32) => {

View file

@ -25,15 +25,11 @@ pub fn parse_streaming_body(
service: &str, service: &str,
) -> Result<Request<ReqBody>, Error> { ) -> Result<Request<ReqBody>, Error> {
let expected_checksums = ExpectedChecksums { let expected_checksums = ExpectedChecksums {
md5: match req.headers().get("content-md5") {
Some(x) => Some(x.to_str()?.to_string()),
None => None,
},
sha256: match &checked_signature.content_sha256_header { sha256: match &checked_signature.content_sha256_header {
ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256), ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256),
_ => None, _ => None,
}, },
extra: None, ..Default::default()
}; };
let mut checksummer = Checksummer::init(&expected_checksums, false); let mut checksummer = Checksummer::init(&expected_checksums, false);

View file

@ -15,7 +15,7 @@ use garage_model::key_table::Key;
use garage_api_common::cors::*; use garage_api_common::cors::*;
use garage_api_common::generic_server::*; use garage_api_common::generic_server::*;
use garage_api_common::helpers::*; use garage_api_common::helpers::*;
use garage_api_common::signature::{verify_request, ContentSha256Header}; use garage_api_common::signature::verify_request;
use crate::bucket::*; use crate::bucket::*;
use crate::copy::*; use crate::copy::*;
@ -124,11 +124,6 @@ impl ApiHandler for S3ApiServer {
let verified_request = verify_request(&garage, req, "s3").await?; let verified_request = verify_request(&garage, req, "s3").await?;
let req = verified_request.request; let req = verified_request.request;
let api_key = verified_request.access_key; let api_key = verified_request.access_key;
let content_sha256 = match verified_request.content_sha256_header {
ContentSha256Header::Sha256Checksum(h) => Some(h),
// TODO take into account streaming/trailer checksums, etc.
_ => None,
};
let bucket_name = match bucket_name { let bucket_name = match bucket_name {
None => { None => {
@ -205,14 +200,14 @@ impl ApiHandler for S3ApiServer {
key, key,
part_number, part_number,
upload_id, upload_id,
} => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await, } => handle_put_part(ctx, req, &key, part_number, &upload_id).await,
Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await, Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await,
Endpoint::UploadPartCopy { Endpoint::UploadPartCopy {
key, key,
part_number, part_number,
upload_id, upload_id,
} => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await, } => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await,
Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await, Endpoint::PutObject { key } => handle_put(ctx, req, &key).await,
Endpoint::AbortMultipartUpload { key, upload_id } => { Endpoint::AbortMultipartUpload { key, upload_id } => {
handle_abort_multipart_upload(ctx, &key, &upload_id).await handle_abort_multipart_upload(ctx, &key, &upload_id).await
} }

View file

@ -94,7 +94,6 @@ pub async fn handle_put_part(
key: &str, key: &str,
part_number: u64, part_number: u64,
upload_id: &str, upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx; let ReqCtx { garage, .. } = &ctx;
@ -105,18 +104,23 @@ pub async fn handle_put_part(
Some(x) => Some(x.to_str()?.to_string()), Some(x) => Some(x.to_str()?.to_string()),
None => None, None => None,
}, },
sha256: content_sha256, sha256: None,
extra: request_checksum_value(req.headers())?, extra: request_checksum_value(req.headers())?,
}; };
// Read first chuck, and at the same time try to get object to see if it exists // Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string(); let key = key.to_string();
let (req_head, req_body) = req.into_parts(); let (req_head, mut req_body) = req.into_parts();
let (stream, checksums) = req_body.streaming_with_checksums(true); req_body.add_expected_checksums(expected_checksums.clone());
// TODO: avoid parsing encryption headers twice...
if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() {
req_body.add_md5();
}
let (stream, stream_checksums) = req_body.streaming_with_checksums();
let stream = stream.map_err(Error::from); let stream = stream.map_err(Error::from);
// TODO checksums
let mut chunker = StreamChunker::new(stream, garage.config.block_size); let mut chunker = StreamChunker::new(stream, garage.config.block_size);
@ -176,21 +180,22 @@ pub async fn handle_put_part(
garage.version_table.insert(&version).await?; garage.version_table.insert(&version).await?;
// Copy data to version // Copy data to version
let checksummer = // TODO don't duplicate checksums
Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm); let (total_size, _, _) = read_and_put_blocks(
let (total_size, checksums, _) = read_and_put_blocks(
&ctx, &ctx,
&version, &version,
encryption, encryption,
part_number, part_number,
first_block, first_block,
&mut chunker, chunker,
checksummer, Checksummer::new(),
) )
.await?; .await?;
// Verify that checksums map // Verify that checksums map
checksums.verify(&expected_checksums)?; let checksums = stream_checksums
.await
.ok_or_internal_error("checksum calculation")??;
// Store part etag in version // Store part etag in version
let etag = encryption.etag_from_md5(&checksums.md5); let etag = encryption.etag_from_md5(&checksums.md5);

View file

@ -31,6 +31,7 @@ use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*; use garage_model::s3::version_table::*;
use garage_api_common::helpers::*; use garage_api_common::helpers::*;
use garage_api_common::signature::body::StreamingChecksumReceiver;
use garage_api_common::signature::checksum::*; use garage_api_common::signature::checksum::*;
use crate::api_server::{ReqBody, ResBody}; use crate::api_server::{ReqBody, ResBody};
@ -49,6 +50,7 @@ pub(crate) struct SaveStreamResult {
pub(crate) enum ChecksumMode<'a> { pub(crate) enum ChecksumMode<'a> {
Verify(&'a ExpectedChecksums), Verify(&'a ExpectedChecksums),
VerifyFrom(StreamingChecksumReceiver),
Calculate(Option<ChecksumAlgorithm>), Calculate(Option<ChecksumAlgorithm>),
} }
@ -56,7 +58,6 @@ pub async fn handle_put(
ctx: ReqCtx, ctx: ReqCtx,
req: Request<ReqBody>, req: Request<ReqBody>,
key: &String, key: &String,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
// Retrieve interesting headers from request // Retrieve interesting headers from request
let headers = get_headers(req.headers())?; let headers = get_headers(req.headers())?;
@ -67,7 +68,7 @@ pub async fn handle_put(
Some(x) => Some(x.to_str()?.to_string()), Some(x) => Some(x.to_str()?.to_string()),
None => None, None => None,
}, },
sha256: content_sha256, sha256: None,
extra: request_checksum_value(req.headers())?, extra: request_checksum_value(req.headers())?,
}; };
@ -79,9 +80,14 @@ pub async fn handle_put(
// Determine whether object should be encrypted, and if so the key // Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?; let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
let (stream, checksums) = req.into_body().streaming_with_checksums(true); let mut req_body = req.into_body();
req_body.add_expected_checksums(expected_checksums.clone());
if !encryption.is_encrypted() {
req_body.add_md5();
}
let (stream, checksums) = req_body.streaming_with_checksums();
let stream = stream.map_err(Error::from); let stream = stream.map_err(Error::from);
// TODO checksums
let res = save_stream( let res = save_stream(
&ctx, &ctx,
@ -89,7 +95,7 @@ pub async fn handle_put(
encryption, encryption,
stream, stream,
key, key,
ChecksumMode::Verify(&expected_checksums), ChecksumMode::VerifyFrom(checksums),
) )
.await?; .await?;
@ -125,10 +131,15 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let version_uuid = gen_uuid(); let version_uuid = gen_uuid();
let version_timestamp = next_timestamp(existing_object.as_ref()); let version_timestamp = next_timestamp(existing_object.as_ref());
let mut checksummer = match checksum_mode { let mut checksummer = match &checksum_mode {
ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()), ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()),
ChecksumMode::Calculate(algo) => { ChecksumMode::Calculate(algo) => {
Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo) Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(*algo)
}
ChecksumMode::VerifyFrom(_) => {
// Checksums are calculated by the garage_api_common::signature module
// so here we can just have an empty checksummer that does nothing
Checksummer::new()
} }
}; };
@ -136,7 +147,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// as "inline data". We can then return immediately. // as "inline data". We can then return immediately.
if first_block.len() < INLINE_THRESHOLD { if first_block.len() < INLINE_THRESHOLD {
checksummer.update(&first_block); checksummer.update(&first_block);
let checksums = checksummer.finalize(); let mut checksums = checksummer.finalize();
match checksum_mode { match checksum_mode {
ChecksumMode::Verify(expected) => { ChecksumMode::Verify(expected) => {
@ -145,6 +156,12 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ChecksumMode::Calculate(algo) => { ChecksumMode::Calculate(algo) => {
meta.checksum = checksums.extract(algo); meta.checksum = checksums.extract(algo);
} }
ChecksumMode::VerifyFrom(checksummer) => {
drop(chunker);
checksums = checksummer
.await
.ok_or_internal_error("checksum calculation")??;
}
}; };
let size = first_block.len() as u64; let size = first_block.len() as u64;
@ -216,13 +233,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage.version_table.insert(&version).await?; garage.version_table.insert(&version).await?;
// Transfer data // Transfer data
let (total_size, checksums, first_block_hash) = read_and_put_blocks( let (total_size, mut checksums, first_block_hash) = read_and_put_blocks(
ctx, ctx,
&version, &version,
encryption, encryption,
1, 1,
first_block, first_block,
&mut chunker, chunker,
checksummer, checksummer,
) )
.await?; .await?;
@ -235,6 +252,11 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ChecksumMode::Calculate(algo) => { ChecksumMode::Calculate(algo) => {
meta.checksum = checksums.extract(algo); meta.checksum = checksums.extract(algo);
} }
ChecksumMode::VerifyFrom(checksummer) => {
checksums = checksummer
.await
.ok_or_internal_error("checksum calculation")??;
}
}; };
// Verify quotas are respsected // Verify quotas are respsected
@ -335,7 +357,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
encryption: EncryptionParams, encryption: EncryptionParams,
part_number: u64, part_number: u64,
first_block: Bytes, first_block: Bytes,
chunker: &mut StreamChunker<S>, mut chunker: StreamChunker<S>,
checksummer: Checksummer, checksummer: Checksummer,
) -> Result<(u64, Checksums, Hash), Error> { ) -> Result<(u64, Checksums, Hash), Error> {
let tracer = opentelemetry::global::tracer("garage"); let tracer = opentelemetry::global::tracer("garage");