garage_api(fixup): Fixups from reviews
This commit is contained in:
parent
f0cb931a45
commit
2b034b7c4e
4 changed files with 260 additions and 231 deletions
|
@ -80,11 +80,11 @@ pub async fn handle_delete_objects(
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let content_sha256 =
|
|
||||||
content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?;
|
|
||||||
|
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
|
|
||||||
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
}
|
||||||
|
|
||||||
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
||||||
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
|
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
use std::collections::{BTreeMap, VecDeque};
|
use std::collections::{BTreeMap, VecDeque};
|
||||||
use std::pin::Pin;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||||
use futures::task;
|
|
||||||
use futures::{prelude::*, TryFutureExt};
|
use futures::{prelude::*, TryFutureExt};
|
||||||
use hyper::body::{Body, Bytes};
|
use hyper::body::{Body, Bytes};
|
||||||
use hyper::{Request, Response};
|
use hyper::{Request, Response};
|
||||||
|
@ -24,8 +22,9 @@ use garage_model::version_table::*;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::s3_xml;
|
use crate::s3_xml;
|
||||||
|
use crate::signature::streaming::SignedPayloadStream;
|
||||||
|
use crate::signature::verify_signed_content;
|
||||||
use crate::signature::LONG_DATETIME;
|
use crate::signature::LONG_DATETIME;
|
||||||
use crate::signature::{streaming::check_streaming_payload_signature, verify_signed_content};
|
|
||||||
|
|
||||||
pub async fn handle_put(
|
pub async fn handle_put(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
|
@ -59,6 +58,7 @@ pub async fn handle_put(
|
||||||
|
|
||||||
// Parse body of uploaded file
|
// Parse body of uploaded file
|
||||||
let (head, body) = req.into_parts();
|
let (head, body) = req.into_parts();
|
||||||
|
let body = body.map_err(Error::from);
|
||||||
|
|
||||||
let body = if let Some(signature) = payload_seed_signature {
|
let body = if let Some(signature) = payload_seed_signature {
|
||||||
let secret_key = &api_key
|
let secret_key = &api_key
|
||||||
|
@ -76,11 +76,11 @@ pub async fn handle_put(
|
||||||
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
|
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
|
||||||
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
|
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
|
||||||
|
|
||||||
SignedPayloadChunker::new(body, garage.clone(), date, secret_key, signature)
|
SignedPayloadStream::new(body, garage.clone(), date, secret_key, signature)?
|
||||||
.map_err(Error::from)
|
.map_err(Error::from)
|
||||||
.boxed()
|
.boxed()
|
||||||
} else {
|
} else {
|
||||||
body.map_err(Error::from).boxed()
|
body.boxed()
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
||||||
|
@ -217,13 +217,13 @@ fn ensure_checksum_matches(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_and_put_blocks<E: Into<Error>, S: Stream<Item = Result<Bytes, E>> + Unpin>(
|
async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
garage: &Garage,
|
garage: &Garage,
|
||||||
version: &Version,
|
version: &Version,
|
||||||
part_number: u64,
|
part_number: u64,
|
||||||
first_block: Vec<u8>,
|
first_block: Vec<u8>,
|
||||||
first_block_hash: Hash,
|
first_block_hash: Hash,
|
||||||
chunker: &mut StreamChunker<S, E>,
|
chunker: &mut StreamChunker<S>,
|
||||||
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
|
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
|
||||||
let mut md5hasher = Md5::new();
|
let mut md5hasher = Md5::new();
|
||||||
let mut sha256hasher = Sha256::new();
|
let mut sha256hasher = Sha256::new();
|
||||||
|
@ -245,9 +245,9 @@ async fn read_and_put_blocks<E: Into<Error>, S: Stream<Item = Result<Bytes, E>>
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (_, _, next_block) = futures::try_join!(
|
let (_, _, next_block) = futures::try_join!(
|
||||||
put_curr_block.map_err(Into::into),
|
put_curr_block.map_err(Error::from),
|
||||||
put_curr_version_block.map_err(Into::into),
|
put_curr_version_block.map_err(Error::from),
|
||||||
chunker.next().map_err(Into::into)
|
chunker.next(),
|
||||||
)?;
|
)?;
|
||||||
if let Some(block) = next_block {
|
if let Some(block) = next_block {
|
||||||
md5hasher.update(&block[..]);
|
md5hasher.update(&block[..]);
|
||||||
|
@ -308,214 +308,14 @@ async fn put_block_meta(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
mod payload {
|
struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
|
||||||
use std::fmt;
|
|
||||||
|
|
||||||
pub struct Header {
|
|
||||||
pub size: usize,
|
|
||||||
pub signature: Box<[u8]>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Header {
|
|
||||||
pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self> {
|
|
||||||
use nom::bytes::streaming::tag;
|
|
||||||
use nom::character::streaming::hex_digit1;
|
|
||||||
use nom::combinator::map_res;
|
|
||||||
use nom::number::streaming::hex_u32;
|
|
||||||
|
|
||||||
let (input, size) = hex_u32(input)?;
|
|
||||||
let (input, _) = tag(";")(input)?;
|
|
||||||
|
|
||||||
let (input, _) = tag("chunk-signature=")(input)?;
|
|
||||||
let (input, data) = map_res(hex_digit1, hex::decode)(input)?;
|
|
||||||
|
|
||||||
let (input, _) = tag("\r\n")(input)?;
|
|
||||||
|
|
||||||
let header = Header {
|
|
||||||
size: size as usize,
|
|
||||||
signature: data.into_boxed_slice(),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok((input, header))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for Header {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
f.debug_struct("Header")
|
|
||||||
.field("size", &self.size)
|
|
||||||
.field("signature", &hex::encode(&self.signature))
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
enum SignedPayloadChunkerError<StreamE> {
|
|
||||||
Stream(StreamE),
|
|
||||||
InvalidSignature,
|
|
||||||
Message(String),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<StreamE> SignedPayloadChunkerError<StreamE> {
|
|
||||||
fn message(msg: &str) -> Self {
|
|
||||||
SignedPayloadChunkerError::Message(msg.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<StreamE> From<SignedPayloadChunkerError<StreamE>> for Error
|
|
||||||
where
|
|
||||||
StreamE: Into<Error>,
|
|
||||||
{
|
|
||||||
fn from(err: SignedPayloadChunkerError<StreamE>) -> Self {
|
|
||||||
match err {
|
|
||||||
SignedPayloadChunkerError::Stream(e) => e.into(),
|
|
||||||
SignedPayloadChunkerError::InvalidSignature => {
|
|
||||||
Error::BadRequest("Invalid payload signature".into())
|
|
||||||
}
|
|
||||||
SignedPayloadChunkerError::Message(e) => {
|
|
||||||
Error::BadRequest(format!("Chunk format error: {}", e))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<E, I> From<nom::error::Error<I>> for SignedPayloadChunkerError<E> {
|
|
||||||
fn from(err: nom::error::Error<I>) -> Self {
|
|
||||||
Self::message(err.code.description())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[pin_project::pin_project]
|
|
||||||
struct SignedPayloadChunker<S, E>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<Bytes, E>>,
|
|
||||||
{
|
|
||||||
#[pin]
|
|
||||||
stream: S,
|
|
||||||
buf: bytes::BytesMut,
|
|
||||||
garage: Arc<Garage>,
|
|
||||||
datetime: DateTime<Utc>,
|
|
||||||
secret_key: String,
|
|
||||||
previous_signature: Hash,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, E> SignedPayloadChunker<S, E>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<Bytes, E>>,
|
|
||||||
{
|
|
||||||
fn new(
|
|
||||||
stream: S,
|
|
||||||
garage: Arc<Garage>,
|
|
||||||
datetime: DateTime<Utc>,
|
|
||||||
secret_key: &str,
|
|
||||||
seed_signature: Hash,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
stream,
|
|
||||||
buf: bytes::BytesMut::new(),
|
|
||||||
garage,
|
|
||||||
datetime,
|
|
||||||
secret_key: secret_key.into(),
|
|
||||||
previous_signature: seed_signature,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, E> Stream for SignedPayloadChunker<S, E>
|
|
||||||
where
|
|
||||||
S: Stream<Item = Result<Bytes, E>> + Unpin,
|
|
||||||
{
|
|
||||||
type Item = Result<Bytes, SignedPayloadChunkerError<E>>;
|
|
||||||
|
|
||||||
fn poll_next(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut task::Context<'_>,
|
|
||||||
) -> task::Poll<Option<Self::Item>> {
|
|
||||||
use std::task::Poll;
|
|
||||||
|
|
||||||
use nom::bytes::streaming::{tag, take};
|
|
||||||
|
|
||||||
let mut this = self.project();
|
|
||||||
|
|
||||||
macro_rules! try_parse {
|
|
||||||
($expr:expr) => {
|
|
||||||
match $expr {
|
|
||||||
Ok(value) => Ok(value),
|
|
||||||
Err(nom::Err::Incomplete(_)) => continue,
|
|
||||||
Err(nom::Err::Error(e @ nom::error::Error { .. }))
|
|
||||||
| Err(nom::Err::Failure(e)) => Err(e),
|
|
||||||
}?
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
|
|
||||||
Some(Ok(bytes)) => {
|
|
||||||
this.buf.extend(bytes);
|
|
||||||
}
|
|
||||||
Some(Err(e)) => {
|
|
||||||
return Poll::Ready(Some(Err(SignedPayloadChunkerError::Stream(e))))
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
if this.buf.is_empty() {
|
|
||||||
return Poll::Ready(None);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let input: &[u8] = this.buf;
|
|
||||||
|
|
||||||
let (input, header) = try_parse!(payload::Header::parse(input));
|
|
||||||
let signature = Hash::try_from(&*header.signature)
|
|
||||||
.ok_or_else(|| SignedPayloadChunkerError::message("Invalid signature"))?;
|
|
||||||
|
|
||||||
// 0-sized chunk is the last
|
|
||||||
if header.size == 0 {
|
|
||||||
this.buf.clear();
|
|
||||||
return Poll::Ready(None);
|
|
||||||
}
|
|
||||||
|
|
||||||
let (input, data) = try_parse!(take(header.size)(input));
|
|
||||||
let (input, _) = try_parse!(tag("\r\n")(input));
|
|
||||||
|
|
||||||
let data = Bytes::from(data.to_vec());
|
|
||||||
let data_sha256sum = sha256sum(&data);
|
|
||||||
|
|
||||||
let expected_signature = check_streaming_payload_signature(
|
|
||||||
this.garage,
|
|
||||||
this.secret_key,
|
|
||||||
*this.datetime,
|
|
||||||
*this.previous_signature,
|
|
||||||
data_sha256sum,
|
|
||||||
)
|
|
||||||
.map_err(|e| {
|
|
||||||
SignedPayloadChunkerError::Message(format!("Could not build signature: {}", e))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if signature != expected_signature {
|
|
||||||
return Poll::Ready(Some(Err(SignedPayloadChunkerError::InvalidSignature)));
|
|
||||||
}
|
|
||||||
|
|
||||||
*this.buf = input.into();
|
|
||||||
*this.previous_signature = signature;
|
|
||||||
|
|
||||||
return Poll::Ready(Some(Ok(data)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
||||||
self.stream.size_hint()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct StreamChunker<S: Stream<Item = Result<Bytes, E>>, E> {
|
|
||||||
stream: S,
|
stream: S,
|
||||||
read_all: bool,
|
read_all: bool,
|
||||||
block_size: usize,
|
block_size: usize,
|
||||||
buf: VecDeque<u8>,
|
buf: VecDeque<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: Stream<Item = Result<Bytes, E>> + Unpin, E> StreamChunker<S, E> {
|
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
|
||||||
fn new(stream: S, block_size: usize) -> Self {
|
fn new(stream: S, block_size: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
stream,
|
stream,
|
||||||
|
@ -525,7 +325,7 @@ impl<S: Stream<Item = Result<Bytes, E>> + Unpin, E> StreamChunker<S, E> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn next(&mut self) -> Result<Option<Vec<u8>>, E> {
|
async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
|
||||||
while !self.read_all && self.buf.len() < self.block_size {
|
while !self.read_all && self.buf.len() < self.block_size {
|
||||||
if let Some(block) = self.stream.next().await {
|
if let Some(block) = self.stream.next().await {
|
||||||
let bytes = block?;
|
let bytes = block?;
|
||||||
|
@ -612,12 +412,20 @@ pub async fn handle_put_part(
|
||||||
|
|
||||||
// Read first chuck, and at the same time try to get object to see if it exists
|
// Read first chuck, and at the same time try to get object to see if it exists
|
||||||
let key = key.to_string();
|
let key = key.to_string();
|
||||||
let mut chunker = StreamChunker::new(req.into_body(), garage.config.block_size);
|
|
||||||
|
let body = req.into_body().map_err(Error::from);
|
||||||
|
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
||||||
|
|
||||||
let (object, version, first_block) = futures::try_join!(
|
let (object, version, first_block) = futures::try_join!(
|
||||||
garage.object_table.get(&bucket_id, &key),
|
garage
|
||||||
garage.version_table.get(&version_uuid, &EmptyKey),
|
.object_table
|
||||||
chunker.next().map_err(GarageError::from),
|
.get(&bucket_id, &key)
|
||||||
|
.map_err(Error::from),
|
||||||
|
garage
|
||||||
|
.version_table
|
||||||
|
.get(&version_uuid, &EmptyKey)
|
||||||
|
.map_err(Error::from),
|
||||||
|
chunker.next(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// Check object is valid and multipart block can be accepted
|
// Check object is valid and multipart block can be accepted
|
||||||
|
|
|
@ -43,11 +43,11 @@ pub async fn handle_put_website(
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let content_sha256 =
|
|
||||||
content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?;
|
|
||||||
|
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
|
|
||||||
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
}
|
||||||
|
|
||||||
let mut bucket = garage
|
let mut bucket = garage
|
||||||
.bucket_table
|
.bucket_table
|
||||||
|
|
|
@ -1,10 +1,17 @@
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
|
use futures::prelude::*;
|
||||||
|
use futures::task;
|
||||||
|
use hyper::body::Bytes;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_util::data::Hash;
|
use garage_util::data::Hash;
|
||||||
use hmac::Mac;
|
use hmac::Mac;
|
||||||
|
|
||||||
use super::signing_hmac;
|
use super::sha256sum;
|
||||||
|
use super::HmacSha256;
|
||||||
use super::{LONG_DATETIME, SHORT_DATE};
|
use super::{LONG_DATETIME, SHORT_DATE};
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
|
@ -13,9 +20,9 @@ use crate::error::*;
|
||||||
const EMPTY_STRING_HEX_DIGEST: &str =
|
const EMPTY_STRING_HEX_DIGEST: &str =
|
||||||
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
|
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
|
||||||
|
|
||||||
pub fn check_streaming_payload_signature(
|
fn compute_streaming_payload_signature(
|
||||||
garage: &Garage,
|
garage: &Garage,
|
||||||
secret_key: &str,
|
signing_hmac: &HmacSha256,
|
||||||
date: DateTime<Utc>,
|
date: DateTime<Utc>,
|
||||||
previous_signature: Hash,
|
previous_signature: Hash,
|
||||||
content_sha256: Hash,
|
content_sha256: Hash,
|
||||||
|
@ -36,9 +43,223 @@ pub fn check_streaming_payload_signature(
|
||||||
]
|
]
|
||||||
.join("\n");
|
.join("\n");
|
||||||
|
|
||||||
let mut hmac = signing_hmac(&date, secret_key, &garage.config.s3_api.s3_region, "s3")
|
let mut hmac = signing_hmac.clone();
|
||||||
.ok_or_internal_error("Unable to build signing HMAC")?;
|
|
||||||
hmac.update(string_to_sign.as_bytes());
|
hmac.update(string_to_sign.as_bytes());
|
||||||
|
|
||||||
Hash::try_from(&hmac.finalize().into_bytes()).ok_or_bad_request("Invalid signature")
|
Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")
|
||||||
|
}
|
||||||
|
|
||||||
|
mod payload {
|
||||||
|
use garage_util::data::Hash;
|
||||||
|
|
||||||
|
pub enum Error<I> {
|
||||||
|
Parser(nom::error::Error<I>),
|
||||||
|
BadSignature,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I> Error<I> {
|
||||||
|
pub fn description(&self) -> &str {
|
||||||
|
match *self {
|
||||||
|
Error::Parser(ref e) => e.code.description(),
|
||||||
|
Error::BadSignature => "Bad signature",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Header {
|
||||||
|
pub size: usize,
|
||||||
|
pub signature: Hash,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Header {
|
||||||
|
pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
|
||||||
|
use nom::bytes::streaming::tag;
|
||||||
|
use nom::character::streaming::hex_digit1;
|
||||||
|
use nom::combinator::map_res;
|
||||||
|
use nom::number::streaming::hex_u32;
|
||||||
|
|
||||||
|
macro_rules! try_parse {
|
||||||
|
($expr:expr) => {
|
||||||
|
$expr.map_err(|e| e.map(Error::Parser))?
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let (input, size) = try_parse!(hex_u32(input));
|
||||||
|
let (input, _) = try_parse!(tag(";")(input));
|
||||||
|
|
||||||
|
let (input, _) = try_parse!(tag("chunk-signature=")(input));
|
||||||
|
let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input));
|
||||||
|
let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?;
|
||||||
|
|
||||||
|
let (input, _) = try_parse!(tag("\r\n")(input));
|
||||||
|
|
||||||
|
let header = Header {
|
||||||
|
size: size as usize,
|
||||||
|
signature,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((input, header))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum SignedPayloadStreamError {
|
||||||
|
Stream(Error),
|
||||||
|
InvalidSignature,
|
||||||
|
Message(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SignedPayloadStreamError {
|
||||||
|
fn message(msg: &str) -> Self {
|
||||||
|
SignedPayloadStreamError::Message(msg.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<SignedPayloadStreamError> for Error {
|
||||||
|
fn from(err: SignedPayloadStreamError) -> Self {
|
||||||
|
match err {
|
||||||
|
SignedPayloadStreamError::Stream(e) => e,
|
||||||
|
SignedPayloadStreamError::InvalidSignature => {
|
||||||
|
Error::BadRequest("Invalid payload signature".into())
|
||||||
|
}
|
||||||
|
SignedPayloadStreamError::Message(e) => {
|
||||||
|
Error::BadRequest(format!("Chunk format error: {}", e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I> From<payload::Error<I>> for SignedPayloadStreamError {
|
||||||
|
fn from(err: payload::Error<I>) -> Self {
|
||||||
|
Self::message(err.description())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I> From<nom::error::Error<I>> for SignedPayloadStreamError {
|
||||||
|
fn from(err: nom::error::Error<I>) -> Self {
|
||||||
|
Self::message(err.code.description())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pin_project::pin_project]
|
||||||
|
pub struct SignedPayloadStream<S>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Bytes, Error>>,
|
||||||
|
{
|
||||||
|
#[pin]
|
||||||
|
stream: S,
|
||||||
|
buf: bytes::BytesMut,
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
datetime: DateTime<Utc>,
|
||||||
|
signing_hmac: HmacSha256,
|
||||||
|
previous_signature: Hash,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> SignedPayloadStream<S>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Bytes, Error>>,
|
||||||
|
{
|
||||||
|
pub fn new(
|
||||||
|
stream: S,
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
datetime: DateTime<Utc>,
|
||||||
|
secret_key: &str,
|
||||||
|
seed_signature: Hash,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
let signing_hmac =
|
||||||
|
super::signing_hmac(&datetime, secret_key, &garage.config.s3_api.s3_region, "s3")
|
||||||
|
.ok_or_internal_error("Could not compute signing HMAC")?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
stream,
|
||||||
|
buf: bytes::BytesMut::new(),
|
||||||
|
garage,
|
||||||
|
datetime,
|
||||||
|
signing_hmac,
|
||||||
|
previous_signature: seed_signature,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Stream for SignedPayloadStream<S>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Bytes, Error>> + Unpin,
|
||||||
|
{
|
||||||
|
type Item = Result<Bytes, SignedPayloadStreamError>;
|
||||||
|
|
||||||
|
fn poll_next(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut task::Context<'_>,
|
||||||
|
) -> task::Poll<Option<Self::Item>> {
|
||||||
|
use std::task::Poll;
|
||||||
|
|
||||||
|
use nom::bytes::streaming::{tag, take};
|
||||||
|
|
||||||
|
let mut this = self.project();
|
||||||
|
|
||||||
|
macro_rules! try_parse {
|
||||||
|
($expr:expr) => {
|
||||||
|
match $expr {
|
||||||
|
Ok(value) => Ok(value),
|
||||||
|
Err(nom::Err::Incomplete(_)) => continue,
|
||||||
|
Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => Err(e),
|
||||||
|
}?
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
|
||||||
|
Some(Ok(bytes)) => {
|
||||||
|
this.buf.extend(bytes);
|
||||||
|
}
|
||||||
|
Some(Err(e)) => return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))),
|
||||||
|
None => {
|
||||||
|
if this.buf.is_empty() {
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let input: &[u8] = this.buf;
|
||||||
|
|
||||||
|
let (input, header) = try_parse!(payload::Header::parse(input));
|
||||||
|
|
||||||
|
// 0-sized chunk is the last
|
||||||
|
if header.size == 0 {
|
||||||
|
this.buf.clear();
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input));
|
||||||
|
let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input));
|
||||||
|
|
||||||
|
let data = Bytes::from(data.to_vec());
|
||||||
|
let data_sha256sum = sha256sum(&data);
|
||||||
|
|
||||||
|
let expected_signature = compute_streaming_payload_signature(
|
||||||
|
this.garage,
|
||||||
|
this.signing_hmac,
|
||||||
|
*this.datetime,
|
||||||
|
*this.previous_signature,
|
||||||
|
data_sha256sum,
|
||||||
|
)
|
||||||
|
.map_err(|e| {
|
||||||
|
SignedPayloadStreamError::Message(format!("Could not build signature: {}", e))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if header.signature != expected_signature {
|
||||||
|
return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature)));
|
||||||
|
}
|
||||||
|
|
||||||
|
*this.buf = input.into();
|
||||||
|
*this.previous_signature = header.signature;
|
||||||
|
|
||||||
|
return Poll::Ready(Some(Ok(data)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||||
|
self.stream.size_hint()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue