garage_api: Handle chunked PUT payload

This commit is contained in:
Jill 2021-11-26 18:48:43 +01:00
parent 732b4d0b63
commit f1bfc939aa
No known key found for this signature in database
GPG key ID: 09A5A2688F13FAC1
5 changed files with 267 additions and 11 deletions

39
Cargo.lock generated
View file

@ -433,7 +433,9 @@ dependencies = [
"idna",
"log",
"md-5",
"nom",
"percent-encoding",
"pin-project",
"quick-xml",
"roxmltree",
"serde",
@ -967,6 +969,12 @@ dependencies = [
"autocfg",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "mio"
version = "0.7.13"
@ -1011,6 +1019,17 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "nom"
version = "7.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"
dependencies = [
"memchr",
"minimal-lexical",
"version_check",
]
[[package]]
name = "ntapi"
version = "0.3.6"
@ -1092,6 +1111,26 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.7"

View file

@ -671,7 +671,9 @@ in
idna = rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.2.3" { inherit profileName; };
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; };
md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.9.1" { inherit profileName; };
nom = rustPackages."registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" { inherit profileName; };
percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; };
pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" { inherit profileName; };
quick_xml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quick-xml."0.21.0" { inherit profileName; };
roxmltree = rustPackages."registry+https://github.com/rust-lang/crates.io-index".roxmltree."0.14.1" { inherit profileName; };
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; };
@ -1321,6 +1323,16 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" = overridableMkRustCrate (profileName: rec {
name = "minimal-lexical";
version = "0.2.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"; };
features = builtins.concatLists [
[ "std" ]
];
});
"registry+https://github.com/rust-lang/crates.io-index".mio."0.7.13" = overridableMkRustCrate (profileName: rec {
name = "mio";
version = "0.7.13";
@ -1381,6 +1393,25 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" = overridableMkRustCrate (profileName: rec {
name = "nom";
version = "7.1.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"; };
features = builtins.concatLists [
[ "alloc" ]
[ "default" ]
[ "std" ]
];
dependencies = {
memchr = rustPackages."registry+https://github.com/rust-lang/crates.io-index".memchr."2.4.1" { inherit profileName; };
minimal_lexical = rustPackages."registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" { inherit profileName; };
};
buildDependencies = {
version_check = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".version_check."0.9.3" { profileName = "__noProfile"; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.6" = overridableMkRustCrate (profileName: rec {
name = "ntapi";
version = "0.3.6";
@ -1490,6 +1521,28 @@ in
src = fetchCratesIo { inherit name version; sha256 = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"; };
});
"registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" = overridableMkRustCrate (profileName: rec {
name = "pin-project";
version = "1.0.8";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"; };
dependencies = {
pin_project_internal = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" { profileName = "__noProfile"; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" = overridableMkRustCrate (profileName: rec {
name = "pin-project-internal";
version = "1.0.8";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"; };
dependencies = {
proc_macro2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".proc-macro2."1.0.30" { inherit profileName; };
quote = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quote."1.0.10" { inherit profileName; };
syn = rustPackages."registry+https://github.com/rust-lang/crates.io-index".syn."1.0.80" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.7" = overridableMkRustCrate (profileName: rec {
name = "pin-project-lite";
version = "0.2.7";

View file

@ -28,10 +28,12 @@ hmac = "0.10"
idna = "0.2"
log = "0.4"
md-5 = "0.9"
nom = "7.1"
sha2 = "0.9"
futures = "0.3"
futures-util = "0.3"
pin-project = "1.0"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
http = "0.2"

View file

@ -1,7 +1,9 @@
use std::collections::{BTreeMap, VecDeque};
use std::pin::Pin;
use std::sync::Arc;
use futures::prelude::*;
use futures::task;
use futures::{prelude::*, TryFutureExt};
use hyper::body::{Body, Bytes};
use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
@ -27,7 +29,7 @@ pub async fn handle_put(
req: Request<Body>,
bucket_id: Uuid,
key: &str,
content_sha256: Option<Hash>,
mut content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
// Generate identity of new version
let version_uuid = gen_uuid();
@ -41,10 +43,24 @@ pub async fn handle_put(
Some(x) => Some(x.to_str()?.to_string()),
None => None,
};
let payload_seed_signature = match req.headers().get("x-amz-content-sha256") {
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
let content_sha256 = content_sha256
.take()
.ok_or_bad_request("No signature provided")?;
Some(content_sha256)
}
_ => None,
};
// Parse body of uploaded file
let body = req.into_body();
let body = match payload_seed_signature {
Some(_) => SignedPayloadChunker::new(body).map_err(Error::from).boxed(),
None => body.map_err(Error::from).boxed(),
};
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let first_block = chunker.next().await?.unwrap_or_default();
@ -179,7 +195,7 @@ fn ensure_checksum_matches(
Ok(())
}
async fn read_and_put_blocks<E: Into<GarageError>, S: Stream<Item = Result<Bytes, E>> + Unpin>(
async fn read_and_put_blocks<E: Into<Error>, S: Stream<Item = Result<Bytes, E>> + Unpin>(
garage: &Garage,
version: &Version,
part_number: u64,
@ -207,9 +223,9 @@ async fn read_and_put_blocks<E: Into<GarageError>, S: Stream<Item = Result<Bytes
loop {
let (_, _, next_block) = futures::try_join!(
put_curr_block,
put_curr_version_block,
chunker.next().map_err(Into::into),
put_curr_block.map_err(Into::into),
put_curr_version_block.map_err(Into::into),
chunker.next().map_err(Into::into)
)?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
@ -270,6 +286,152 @@ async fn put_block_meta(
Ok(())
}
mod payload {
#[derive(Debug)]
pub struct Header {
pub size: usize,
pub signature: Box<[u8]>,
}
impl Header {
pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self> {
use nom::bytes::complete::tag;
use nom::character::complete::hex_digit1;
use nom::combinator::map_res;
use nom::number::complete::hex_u32;
let (input, size) = hex_u32(input)?;
let (input, _) = tag(";")(input)?;
let (input, _) = tag("chunk-signature=")(input)?;
let (input, data) = map_res(hex_digit1, hex::decode)(input)?;
let (input, _) = tag("\r\n")(input)?;
let header = Header {
size: size as usize,
signature: data.into_boxed_slice(),
};
Ok((input, header))
}
}
}
enum SignedPayloadChunkerError<StreamE> {
Stream(StreamE),
Message(String),
}
impl<StreamE> From<SignedPayloadChunkerError<StreamE>> for Error
where
StreamE: Into<Error>,
{
fn from(err: SignedPayloadChunkerError<StreamE>) -> Self {
match err {
SignedPayloadChunkerError::Stream(e) => e.into(),
SignedPayloadChunkerError::Message(e) => {
Error::BadRequest(format!("Chunk format error: {}", e))
}
}
}
}
impl<E, I> From<nom::error::Error<I>> for SignedPayloadChunkerError<E> {
fn from(err: nom::error::Error<I>) -> Self {
Self::Message(err.code.description().into())
}
}
#[pin_project::pin_project]
struct SignedPayloadChunker<S, E>
where
S: Stream<Item = Result<Bytes, E>>,
{
#[pin]
stream: S,
buf: bytes::BytesMut,
}
impl<S, E> SignedPayloadChunker<S, E>
where
S: Stream<Item = Result<Bytes, E>>,
{
fn new(stream: S) -> Self {
Self {
stream,
buf: bytes::BytesMut::new(),
}
}
}
impl<S, E> Stream for SignedPayloadChunker<S, E>
where
S: Stream<Item = Result<Bytes, E>> + Unpin,
{
type Item = Result<Bytes, SignedPayloadChunkerError<E>>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
use std::task::Poll;
use nom::bytes::complete::{tag, take};
let mut this = self.project();
macro_rules! parse_try {
($expr:expr) => {
match $expr {
Ok(value) => value,
Err(nom::Err::Incomplete(_)) => continue,
Err(nom::Err::Error(e @ nom::error::Error { .. }))
| Err(nom::Err::Failure(e)) => return Poll::Ready(Some(Err(e.into()))),
}
};
}
loop {
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
this.buf.extend(bytes);
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(SignedPayloadChunkerError::Stream(e))))
}
None => {
if this.buf.is_empty() {
return Poll::Ready(None);
}
}
}
let input: &[u8] = this.buf;
let (input, header) = parse_try!(payload::Header::parse(input));
// 0-sized chunk is the last
if header.size == 0 {
this.buf.clear();
return Poll::Ready(None);
}
let (input, data) = parse_try!(take(header.size)(input));
let (input, _) = parse_try!(tag("\r\n")(input));
let data = Bytes::from(data.to_vec());
*this.buf = input.into();
return Poll::Ready(Some(Ok(data)));
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
struct StreamChunker<S: Stream<Item = Result<Bytes, E>>, E> {
stream: S,
read_all: bool,
@ -292,7 +454,7 @@ impl<S: Stream<Item = Result<Bytes, E>> + Unpin, E> StreamChunker<S, E> {
if let Some(block) = self.stream.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
self.buf.extend(&bytes[..]);
self.buf.extend(bytes);
} else {
self.read_all = true;
}

View file

@ -97,13 +97,13 @@ pub async fn check_signature(
let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
None
} else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" {
let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?;
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?)
} else {
let bytes = hex::decode(authorization.content_sha256)
.ok_or_bad_request("Invalid content sha256 hash")?;
Some(
Hash::try_from(&bytes[..])
.ok_or_else(|| Error::BadRequest("Invalid content sha256 hash".to_string()))?,
)
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?)
};
Ok((key, content_sha256))