diff --git a/Cargo.lock b/Cargo.lock index 9e0a1bb0..a36cdf83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -660,6 +660,7 @@ dependencies = [ "garage_util 0.1.0", "hex", "http", + "httpdate", "hyper", "idna", "log", diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index 252ee58d..819b51c1 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -42,3 +42,5 @@ webpki = "0.21" roxmltree = "0.11" idna = "0.2" + +httpdate = "0.3" diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 7172f222..8a222738 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -2,17 +2,23 @@ use std::borrow::Cow; use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; +use std::time::{Duration, UNIX_EPOCH}; use futures::future::Future; +use futures::stream::*; -use hyper::header::HOST; -use hyper::server::conn::AddrStream; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response, Server}; +use hyper::{ + header::HOST, + body::Bytes, + server::conn::AddrStream, + service::{make_service_fn, service_fn}, + Body, Request, Response, Server, StatusCode}; use idna::domain_to_unicode; use garage_model::garage::Garage; +use garage_model::object_table::*; +use garage_table::EmptyKey; use garage_util::error::Error as GarageError; use crate::error::*; @@ -90,7 +96,102 @@ async fn serve_file(garage: Arc, req: Request) -> Result x, + _ => unreachable!(), + }; + + // Get metadata from version + let last_v_meta = match last_v_data { + ObjectVersionData::DeleteMarker => return Err(Error::NotFound), + ObjectVersionData::Inline(meta, _) => meta, + ObjectVersionData::FirstBlock(meta, _) => meta, + }; + + // @FIXME Support range + + + // Set headers + let resp_builder = object_headers(&last_v, last_v_meta).status(StatusCode::OK); + + + // Stream body + match &last_v_data { + ObjectVersionData::DeleteMarker => unreachable!(), + ObjectVersionData::Inline(_, bytes) => { + let body: Body = Body::from(bytes.to_vec()); + Ok(resp_builder.body(body)?) + } + ObjectVersionData::FirstBlock(_, first_block_hash) => { + let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash); + let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); + + let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; + let version = version.ok_or(Error::NotFound)?; + + let mut blocks = version + .blocks() + .iter() + .map(|vb| (vb.hash, None)) + .collect::>(); + blocks[0].1 = Some(first_block); + + let body_stream = futures::stream::iter(blocks) + .map(move |(hash, data_opt)| { + let garage = garage.clone(); + async move { + if let Some(data) = data_opt { + Ok(Bytes::from(data)) + } else { + garage + .block_manager + .rpc_get_block(&hash) + .await + .map(Bytes::from) + } + } + }) + .buffered(2); + //let body: Body = Box::new(StreamBody::new(Box::pin(body_stream))); + let body = hyper::body::Body::wrap_stream(body_stream); + Ok(resp_builder.body(body)?) + } + } +} + +// Copied from api/s3_get.rs +fn object_headers( + version: &ObjectVersion, + version_meta: &ObjectVersionMeta, +) -> http::response::Builder { + let date = UNIX_EPOCH + Duration::from_millis(version.timestamp); + let date_str = httpdate::fmt_http_date(date); + + let mut resp = Response::builder() + .header( + "Content-Type", + version_meta.headers.content_type.to_string(), + ) + .header("Content-Length", format!("{}", version_meta.size)) + .header("ETag", version_meta.etag.to_string()) + .header("Last-Modified", date_str) + .header("Accept-Ranges", format!("bytes")); + + for (k, v) in version_meta.headers.other.iter() { + resp = resp.header(k, v.to_string()); + } + + resp } /// Extract host from the authority section given by the HTTP host header