diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 93271579..b243d982 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -80,11 +80,11 @@ pub async fn handle_delete_objects( req: Request, content_sha256: Option, ) -> Result, Error> { - let content_sha256 = - content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; - let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?; diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 00771638..7ebbdb12 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,9 +1,7 @@ use std::collections::{BTreeMap, VecDeque}; -use std::pin::Pin; use std::sync::Arc; use chrono::{DateTime, NaiveDateTime, Utc}; -use futures::task; use futures::{prelude::*, TryFutureExt}; use hyper::body::{Body, Bytes}; use hyper::{Request, Response}; @@ -24,8 +22,9 @@ use garage_model::version_table::*; use crate::error::*; use crate::s3_xml; +use crate::signature::streaming::SignedPayloadStream; +use crate::signature::verify_signed_content; use crate::signature::LONG_DATETIME; -use crate::signature::{streaming::check_streaming_payload_signature, verify_signed_content}; pub async fn handle_put( garage: Arc, @@ -59,6 +58,7 @@ pub async fn handle_put( // Parse body of uploaded file let (head, body) = req.into_parts(); + let body = body.map_err(Error::from); let body = if let Some(signature) = payload_seed_signature { let secret_key = &api_key @@ -76,11 +76,11 @@ pub async fn handle_put( NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; let date: DateTime = DateTime::from_utc(date, Utc); - SignedPayloadChunker::new(body, garage.clone(), date, secret_key, signature) + SignedPayloadStream::new(body, garage.clone(), date, secret_key, signature)? .map_err(Error::from) .boxed() } else { - body.map_err(Error::from).boxed() + body.boxed() }; let mut chunker = StreamChunker::new(body, garage.config.block_size); @@ -217,13 +217,13 @@ fn ensure_checksum_matches( Ok(()) } -async fn read_and_put_blocks, S: Stream> + Unpin>( +async fn read_and_put_blocks> + Unpin>( garage: &Garage, version: &Version, part_number: u64, first_block: Vec, first_block_hash: Hash, - chunker: &mut StreamChunker, + chunker: &mut StreamChunker, ) -> Result<(u64, GenericArray, Hash), Error> { let mut md5hasher = Md5::new(); let mut sha256hasher = Sha256::new(); @@ -245,9 +245,9 @@ async fn read_and_put_blocks, S: Stream> loop { let (_, _, next_block) = futures::try_join!( - put_curr_block.map_err(Into::into), - put_curr_version_block.map_err(Into::into), - chunker.next().map_err(Into::into) + put_curr_block.map_err(Error::from), + put_curr_version_block.map_err(Error::from), + chunker.next(), )?; if let Some(block) = next_block { md5hasher.update(&block[..]); @@ -308,214 +308,14 @@ async fn put_block_meta( Ok(()) } -mod payload { - use std::fmt; - - pub struct Header { - pub size: usize, - pub signature: Box<[u8]>, - } - - impl Header { - pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self> { - use nom::bytes::streaming::tag; - use nom::character::streaming::hex_digit1; - use nom::combinator::map_res; - use nom::number::streaming::hex_u32; - - let (input, size) = hex_u32(input)?; - let (input, _) = tag(";")(input)?; - - let (input, _) = tag("chunk-signature=")(input)?; - let (input, data) = map_res(hex_digit1, hex::decode)(input)?; - - let (input, _) = tag("\r\n")(input)?; - - let header = Header { - size: size as usize, - signature: data.into_boxed_slice(), - }; - - Ok((input, header)) - } - } - - impl fmt::Debug for Header { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Header") - .field("size", &self.size) - .field("signature", &hex::encode(&self.signature)) - .finish() - } - } -} - -enum SignedPayloadChunkerError { - Stream(StreamE), - InvalidSignature, - Message(String), -} - -impl SignedPayloadChunkerError { - fn message(msg: &str) -> Self { - SignedPayloadChunkerError::Message(msg.into()) - } -} - -impl From> for Error -where - StreamE: Into, -{ - fn from(err: SignedPayloadChunkerError) -> Self { - match err { - SignedPayloadChunkerError::Stream(e) => e.into(), - SignedPayloadChunkerError::InvalidSignature => { - Error::BadRequest("Invalid payload signature".into()) - } - SignedPayloadChunkerError::Message(e) => { - Error::BadRequest(format!("Chunk format error: {}", e)) - } - } - } -} - -impl From> for SignedPayloadChunkerError { - fn from(err: nom::error::Error) -> Self { - Self::message(err.code.description()) - } -} - -#[pin_project::pin_project] -struct SignedPayloadChunker -where - S: Stream>, -{ - #[pin] - stream: S, - buf: bytes::BytesMut, - garage: Arc, - datetime: DateTime, - secret_key: String, - previous_signature: Hash, -} - -impl SignedPayloadChunker -where - S: Stream>, -{ - fn new( - stream: S, - garage: Arc, - datetime: DateTime, - secret_key: &str, - seed_signature: Hash, - ) -> Self { - Self { - stream, - buf: bytes::BytesMut::new(), - garage, - datetime, - secret_key: secret_key.into(), - previous_signature: seed_signature, - } - } -} - -impl Stream for SignedPayloadChunker -where - S: Stream> + Unpin, -{ - type Item = Result>; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> task::Poll> { - use std::task::Poll; - - use nom::bytes::streaming::{tag, take}; - - let mut this = self.project(); - - macro_rules! try_parse { - ($expr:expr) => { - match $expr { - Ok(value) => Ok(value), - Err(nom::Err::Incomplete(_)) => continue, - Err(nom::Err::Error(e @ nom::error::Error { .. })) - | Err(nom::Err::Failure(e)) => Err(e), - }? - }; - } - - loop { - match futures::ready!(this.stream.as_mut().poll_next(cx)) { - Some(Ok(bytes)) => { - this.buf.extend(bytes); - } - Some(Err(e)) => { - return Poll::Ready(Some(Err(SignedPayloadChunkerError::Stream(e)))) - } - None => { - if this.buf.is_empty() { - return Poll::Ready(None); - } - } - } - - let input: &[u8] = this.buf; - - let (input, header) = try_parse!(payload::Header::parse(input)); - let signature = Hash::try_from(&*header.signature) - .ok_or_else(|| SignedPayloadChunkerError::message("Invalid signature"))?; - - // 0-sized chunk is the last - if header.size == 0 { - this.buf.clear(); - return Poll::Ready(None); - } - - let (input, data) = try_parse!(take(header.size)(input)); - let (input, _) = try_parse!(tag("\r\n")(input)); - - let data = Bytes::from(data.to_vec()); - let data_sha256sum = sha256sum(&data); - - let expected_signature = check_streaming_payload_signature( - this.garage, - this.secret_key, - *this.datetime, - *this.previous_signature, - data_sha256sum, - ) - .map_err(|e| { - SignedPayloadChunkerError::Message(format!("Could not build signature: {}", e)) - })?; - - if signature != expected_signature { - return Poll::Ready(Some(Err(SignedPayloadChunkerError::InvalidSignature))); - } - - *this.buf = input.into(); - *this.previous_signature = signature; - - return Poll::Ready(Some(Ok(data))); - } - } - - fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() - } -} - -struct StreamChunker>, E> { +struct StreamChunker>> { stream: S, read_all: bool, block_size: usize, buf: VecDeque, } -impl> + Unpin, E> StreamChunker { +impl> + Unpin> StreamChunker { fn new(stream: S, block_size: usize) -> Self { Self { stream, @@ -525,7 +325,7 @@ impl> + Unpin, E> StreamChunker { } } - async fn next(&mut self) -> Result>, E> { + async fn next(&mut self) -> Result>, Error> { while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.stream.next().await { let bytes = block?; @@ -612,12 +412,20 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let mut chunker = StreamChunker::new(req.into_body(), garage.config.block_size); + + let body = req.into_body().map_err(Error::from); + let mut chunker = StreamChunker::new(body, garage.config.block_size); let (object, version, first_block) = futures::try_join!( - garage.object_table.get(&bucket_id, &key), - garage.version_table.get(&version_uuid, &EmptyKey), - chunker.next().map_err(GarageError::from), + garage + .object_table + .get(&bucket_id, &key) + .map_err(Error::from), + garage + .version_table + .get(&version_uuid, &EmptyKey) + .map_err(Error::from), + chunker.next(), )?; // Check object is valid and multipart block can be accepted diff --git a/src/api/s3_website.rs b/src/api/s3_website.rs index b662c0b5..ab95d0af 100644 --- a/src/api/s3_website.rs +++ b/src/api/s3_website.rs @@ -43,11 +43,11 @@ pub async fn handle_put_website( req: Request, content_sha256: Option, ) -> Result, Error> { - let content_sha256 = - content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?; - let body = hyper::body::to_bytes(req.into_body()).await?; - verify_signed_content(content_sha256, &body[..])?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } let mut bucket = garage .bucket_table diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index 35c93f5a..00fc5572 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -1,10 +1,17 @@ +use std::pin::Pin; +use std::sync::Arc; + use chrono::{DateTime, Utc}; +use futures::prelude::*; +use futures::task; +use hyper::body::Bytes; use garage_model::garage::Garage; use garage_util::data::Hash; use hmac::Mac; -use super::signing_hmac; +use super::sha256sum; +use super::HmacSha256; use super::{LONG_DATETIME, SHORT_DATE}; use crate::error::*; @@ -13,9 +20,9 @@ use crate::error::*; const EMPTY_STRING_HEX_DIGEST: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; -pub fn check_streaming_payload_signature( +fn compute_streaming_payload_signature( garage: &Garage, - secret_key: &str, + signing_hmac: &HmacSha256, date: DateTime, previous_signature: Hash, content_sha256: Hash, @@ -36,9 +43,223 @@ pub fn check_streaming_payload_signature( ] .join("\n"); - let mut hmac = signing_hmac(&date, secret_key, &garage.config.s3_api.s3_region, "s3") - .ok_or_internal_error("Unable to build signing HMAC")?; + let mut hmac = signing_hmac.clone(); hmac.update(string_to_sign.as_bytes()); - Hash::try_from(&hmac.finalize().into_bytes()).ok_or_bad_request("Invalid signature") + Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature") +} + +mod payload { + use garage_util::data::Hash; + + pub enum Error { + Parser(nom::error::Error), + BadSignature, + } + + impl Error { + pub fn description(&self) -> &str { + match *self { + Error::Parser(ref e) => e.code.description(), + Error::BadSignature => "Bad signature", + } + } + } + + #[derive(Debug, Clone)] + pub struct Header { + pub size: usize, + pub signature: Hash, + } + + impl Header { + pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> { + use nom::bytes::streaming::tag; + use nom::character::streaming::hex_digit1; + use nom::combinator::map_res; + use nom::number::streaming::hex_u32; + + macro_rules! try_parse { + ($expr:expr) => { + $expr.map_err(|e| e.map(Error::Parser))? + }; + } + + let (input, size) = try_parse!(hex_u32(input)); + let (input, _) = try_parse!(tag(";")(input)); + + let (input, _) = try_parse!(tag("chunk-signature=")(input)); + let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input)); + let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?; + + let (input, _) = try_parse!(tag("\r\n")(input)); + + let header = Header { + size: size as usize, + signature, + }; + + Ok((input, header)) + } + } +} + +pub enum SignedPayloadStreamError { + Stream(Error), + InvalidSignature, + Message(String), +} + +impl SignedPayloadStreamError { + fn message(msg: &str) -> Self { + SignedPayloadStreamError::Message(msg.into()) + } +} + +impl From for Error { + fn from(err: SignedPayloadStreamError) -> Self { + match err { + SignedPayloadStreamError::Stream(e) => e, + SignedPayloadStreamError::InvalidSignature => { + Error::BadRequest("Invalid payload signature".into()) + } + SignedPayloadStreamError::Message(e) => { + Error::BadRequest(format!("Chunk format error: {}", e)) + } + } + } +} + +impl From> for SignedPayloadStreamError { + fn from(err: payload::Error) -> Self { + Self::message(err.description()) + } +} + +impl From> for SignedPayloadStreamError { + fn from(err: nom::error::Error) -> Self { + Self::message(err.code.description()) + } +} + +#[pin_project::pin_project] +pub struct SignedPayloadStream +where + S: Stream>, +{ + #[pin] + stream: S, + buf: bytes::BytesMut, + garage: Arc, + datetime: DateTime, + signing_hmac: HmacSha256, + previous_signature: Hash, +} + +impl SignedPayloadStream +where + S: Stream>, +{ + pub fn new( + stream: S, + garage: Arc, + datetime: DateTime, + secret_key: &str, + seed_signature: Hash, + ) -> Result { + let signing_hmac = + super::signing_hmac(&datetime, secret_key, &garage.config.s3_api.s3_region, "s3") + .ok_or_internal_error("Could not compute signing HMAC")?; + + Ok(Self { + stream, + buf: bytes::BytesMut::new(), + garage, + datetime, + signing_hmac, + previous_signature: seed_signature, + }) + } +} + +impl Stream for SignedPayloadStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll> { + use std::task::Poll; + + use nom::bytes::streaming::{tag, take}; + + let mut this = self.project(); + + macro_rules! try_parse { + ($expr:expr) => { + match $expr { + Ok(value) => Ok(value), + Err(nom::Err::Incomplete(_)) => continue, + Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => Err(e), + }? + }; + } + + loop { + match futures::ready!(this.stream.as_mut().poll_next(cx)) { + Some(Ok(bytes)) => { + this.buf.extend(bytes); + } + Some(Err(e)) => return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e)))), + None => { + if this.buf.is_empty() { + return Poll::Ready(None); + } + } + } + + let input: &[u8] = this.buf; + + let (input, header) = try_parse!(payload::Header::parse(input)); + + // 0-sized chunk is the last + if header.size == 0 { + this.buf.clear(); + return Poll::Ready(None); + } + + let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input)); + let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input)); + + let data = Bytes::from(data.to_vec()); + let data_sha256sum = sha256sum(&data); + + let expected_signature = compute_streaming_payload_signature( + this.garage, + this.signing_hmac, + *this.datetime, + *this.previous_signature, + data_sha256sum, + ) + .map_err(|e| { + SignedPayloadStreamError::Message(format!("Could not build signature: {}", e)) + })?; + + if header.signature != expected_signature { + return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature))); + } + + *this.buf = input.into(); + *this.previous_signature = header.signature; + + return Poll::Ready(Some(Ok(data))); + } + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } }