Update to Hyper 0.13.6 that accepts non-Sync streams in wrap_stream.
Simplifies code and makes it possible to publish on crates.io
This commit is contained in:
parent
3b0b11085e
commit
f22ecb60a8
11 changed files with 515 additions and 523 deletions
857
Cargo.lock
generated
857
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
6
Makefile
6
Makefile
|
@ -3,10 +3,12 @@ DOCKER=lxpz/garage_amd64
|
||||||
|
|
||||||
all:
|
all:
|
||||||
#cargo fmt || true
|
#cargo fmt || true
|
||||||
RUSTFLAGS="-C link-arg=-fuse-ld=lld" cargo build
|
#RUSTFLAGS="-C link-arg=-fuse-ld=lld" cargo build
|
||||||
|
cargo build
|
||||||
|
|
||||||
$(BIN):
|
$(BIN):
|
||||||
RUSTFLAGS="-C link-arg=-fuse-ld=lld" cargo build --release
|
#RUSTFLAGS="-C link-arg=-fuse-ld=lld" cargo build --release
|
||||||
|
cargo build --release
|
||||||
|
|
||||||
$(BIN).stripped: $(BIN)
|
$(BIN).stripped: $(BIN)
|
||||||
cp $^ $@
|
cp $^ $@
|
||||||
|
|
|
@ -30,7 +30,7 @@ futures-util = "0.3"
|
||||||
tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
|
tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
|
||||||
|
|
||||||
http = "0.2"
|
http = "0.2"
|
||||||
hyper = "0.13"
|
hyper = "^0.13.6"
|
||||||
url = "2.1"
|
url = "2.1"
|
||||||
httpdate = "0.3"
|
httpdate = "0.3"
|
||||||
percent-encoding = "2.1.0"
|
percent-encoding = "2.1.0"
|
||||||
|
|
|
@ -11,7 +11,6 @@ use garage_util::error::Error;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
use crate::http_util::*;
|
|
||||||
use crate::signature::check_signature;
|
use crate::signature::check_signature;
|
||||||
|
|
||||||
use crate::s3_copy::*;
|
use crate::s3_copy::*;
|
||||||
|
@ -50,7 +49,7 @@ async fn handler(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
info!("{} {} {}", addr, req.method(), req.uri());
|
info!("{} {} {}", addr, req.method(), req.uri());
|
||||||
debug!("{:?}", req);
|
debug!("{:?}", req);
|
||||||
match handler_inner(garage, req).await {
|
match handler_inner(garage, req).await {
|
||||||
|
@ -59,7 +58,7 @@ async fn handler(
|
||||||
Ok(x)
|
Ok(x)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let body: BodyType = Box::new(BytesBody::from(format!("{}\n", e)));
|
let body: Body = Body::from(format!("{}\n", e));
|
||||||
let mut http_error = Response::new(body);
|
let mut http_error = Response::new(body);
|
||||||
*http_error.status_mut() = e.http_status_code();
|
*http_error.status_mut() = e.http_status_code();
|
||||||
warn!("Response: error {}, {}", e.http_status_code(), e);
|
warn!("Response: error {}, {}", e.http_status_code(), e);
|
||||||
|
@ -71,7 +70,7 @@ async fn handler(
|
||||||
async fn handler_inner(
|
async fn handler_inner(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let path = req.uri().path().to_string();
|
let path = req.uri().path().to_string();
|
||||||
let path = percent_encoding::percent_decode_str(&path).decode_utf8()?;
|
let path = percent_encoding::percent_decode_str(&path).decode_utf8()?;
|
||||||
|
|
||||||
|
@ -180,7 +179,7 @@ async fn handler_inner(
|
||||||
std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
|
std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
|
||||||
.unwrap_or("<invalid utf8>")
|
.unwrap_or("<invalid utf8>")
|
||||||
);
|
);
|
||||||
let empty_body: BodyType = Box::new(BytesBody::from(vec![]));
|
let empty_body: Body = Body::from(vec![]);
|
||||||
let response = Response::builder()
|
let response = Response::builder()
|
||||||
.header("Location", format!("/{}", bucket))
|
.header("Location", format!("/{}", bucket))
|
||||||
.body(empty_body)
|
.body(empty_body)
|
||||||
|
@ -189,7 +188,7 @@ async fn handler_inner(
|
||||||
}
|
}
|
||||||
&Method::HEAD => {
|
&Method::HEAD => {
|
||||||
// HeadBucket
|
// HeadBucket
|
||||||
let empty_body: BodyType = Box::new(BytesBody::from(vec![]));
|
let empty_body: Body = Body::from(vec![]);
|
||||||
let response = Response::builder().body(empty_body).unwrap();
|
let response = Response::builder().body(empty_body).unwrap();
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,88 +0,0 @@
|
||||||
use core::pin::Pin;
|
|
||||||
use core::task::{Context, Poll};
|
|
||||||
|
|
||||||
use futures::ready;
|
|
||||||
use futures::stream::*;
|
|
||||||
use hyper::body::{Bytes, HttpBody};
|
|
||||||
|
|
||||||
use garage_util::error::Error;
|
|
||||||
|
|
||||||
pub type BodyType = Box<dyn HttpBody<Data = Bytes, Error = Error> + Send + Unpin>;
|
|
||||||
|
|
||||||
type StreamType = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
|
|
||||||
|
|
||||||
pub struct StreamBody {
|
|
||||||
stream: StreamType,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl StreamBody {
|
|
||||||
pub fn new(stream: StreamType) -> Self {
|
|
||||||
Self { stream }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HttpBody for StreamBody {
|
|
||||||
type Data = Bytes;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn poll_data(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context,
|
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
|
||||||
match ready!(self.stream.as_mut().poll_next(cx)) {
|
|
||||||
Some(res) => Poll::Ready(Some(res)),
|
|
||||||
None => Poll::Ready(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_trailers(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
_cx: &mut Context,
|
|
||||||
) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
|
|
||||||
Poll::Ready(Ok(None))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct BytesBody {
|
|
||||||
bytes: Option<Bytes>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BytesBody {
|
|
||||||
pub fn new(bytes: Bytes) -> Self {
|
|
||||||
Self { bytes: Some(bytes) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HttpBody for BytesBody {
|
|
||||||
type Data = Bytes;
|
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn poll_data(
|
|
||||||
mut self: Pin<&mut Self>,
|
|
||||||
_cx: &mut Context,
|
|
||||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
|
||||||
Poll::Ready(self.bytes.take().map(Ok))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_trailers(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
_cx: &mut Context,
|
|
||||||
) -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Self::Error>> {
|
|
||||||
Poll::Ready(Ok(None))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<String> for BytesBody {
|
|
||||||
fn from(x: String) -> BytesBody {
|
|
||||||
Self::new(Bytes::from(x))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl From<Vec<u8>> for BytesBody {
|
|
||||||
fn from(x: Vec<u8>) -> BytesBody {
|
|
||||||
Self::new(Bytes::from(x))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn empty_body() -> BodyType {
|
|
||||||
Box::new(BytesBody::from(vec![]))
|
|
||||||
}
|
|
|
@ -2,7 +2,6 @@
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
pub mod encoding;
|
pub mod encoding;
|
||||||
pub mod http_util;
|
|
||||||
|
|
||||||
pub mod api_server;
|
pub mod api_server;
|
||||||
pub mod signature;
|
pub mod signature;
|
||||||
|
|
|
@ -2,7 +2,7 @@ use std::fmt::Write;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use chrono::{SecondsFormat, Utc};
|
use chrono::{SecondsFormat, Utc};
|
||||||
use hyper::Response;
|
use hyper::{Body, Response};
|
||||||
|
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
@ -13,15 +13,13 @@ use garage_model::garage::Garage;
|
||||||
use garage_model::object_table::*;
|
use garage_model::object_table::*;
|
||||||
use garage_model::version_table::*;
|
use garage_model::version_table::*;
|
||||||
|
|
||||||
use crate::http_util::*;
|
|
||||||
|
|
||||||
pub async fn handle_copy(
|
pub async fn handle_copy(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
dest_bucket: &str,
|
dest_bucket: &str,
|
||||||
dest_key: &str,
|
dest_key: &str,
|
||||||
source_bucket: &str,
|
source_bucket: &str,
|
||||||
source_key: &str,
|
source_key: &str,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let source_object = match garage
|
let source_object = match garage
|
||||||
.object_table
|
.object_table
|
||||||
.get(&source_bucket.to_string(), &source_key.to_string())
|
.get(&source_bucket.to_string(), &source_key.to_string())
|
||||||
|
@ -116,5 +114,5 @@ pub async fn handle_copy(
|
||||||
writeln!(&mut xml, "\t<LastModified>{}</LastModified>", last_modified).unwrap();
|
writeln!(&mut xml, "\t<LastModified>{}</LastModified>", last_modified).unwrap();
|
||||||
writeln!(&mut xml, "</CopyObjectResult>").unwrap();
|
writeln!(&mut xml, "</CopyObjectResult>").unwrap();
|
||||||
|
|
||||||
Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes()))))
|
Ok(Response::new(Body::from(xml.into_bytes())))
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,6 @@ use garage_model::garage::Garage;
|
||||||
use garage_model::object_table::*;
|
use garage_model::object_table::*;
|
||||||
|
|
||||||
use crate::encoding::*;
|
use crate::encoding::*;
|
||||||
use crate::http_util::*;
|
|
||||||
|
|
||||||
async fn handle_delete_internal(
|
async fn handle_delete_internal(
|
||||||
garage: &Garage,
|
garage: &Garage,
|
||||||
|
@ -70,13 +69,13 @@ pub async fn handle_delete(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
bucket: &str,
|
bucket: &str,
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let (_deleted_version, delete_marker_version) =
|
let (_deleted_version, delete_marker_version) =
|
||||||
handle_delete_internal(&garage, bucket, key).await?;
|
handle_delete_internal(&garage, bucket, key).await?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.header("x-amz-version-id", hex::encode(delete_marker_version))
|
.header("x-amz-version-id", hex::encode(delete_marker_version))
|
||||||
.body(empty_body())
|
.body(Body::from(vec![]))
|
||||||
.unwrap())
|
.unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,7 +83,7 @@ pub async fn handle_delete_objects(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
bucket: &str,
|
bucket: &str,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?;
|
let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?;
|
||||||
let cmd = parse_delete_objects_xml(&cmd_xml)
|
let cmd = parse_delete_objects_xml(&cmd_xml)
|
||||||
|
@ -130,9 +129,9 @@ pub async fn handle_delete_objects(
|
||||||
|
|
||||||
writeln!(&mut retxml, "</DeleteObjectsOutput>").unwrap();
|
writeln!(&mut retxml, "</DeleteObjectsOutput>").unwrap();
|
||||||
|
|
||||||
Ok(Response::new(Box::new(BytesBody::from(
|
Ok(Response::new(Body::from(
|
||||||
retxml.into_bytes(),
|
retxml.into_bytes(),
|
||||||
))))
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DeleteRequest {
|
struct DeleteRequest {
|
||||||
|
|
|
@ -12,8 +12,6 @@ use garage_table::EmptyKey;
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::object_table::*;
|
use garage_model::object_table::*;
|
||||||
|
|
||||||
use crate::http_util::*;
|
|
||||||
|
|
||||||
fn object_headers(version: &ObjectVersion) -> http::response::Builder {
|
fn object_headers(version: &ObjectVersion) -> http::response::Builder {
|
||||||
let date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
|
let date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
|
||||||
let date_str = httpdate::fmt_http_date(date);
|
let date_str = httpdate::fmt_http_date(date);
|
||||||
|
@ -29,7 +27,7 @@ pub async fn handle_head(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
bucket: &str,
|
bucket: &str,
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let object = match garage
|
let object = match garage
|
||||||
.object_table
|
.object_table
|
||||||
.get(&bucket.to_string(), &key.to_string())
|
.get(&bucket.to_string(), &key.to_string())
|
||||||
|
@ -50,7 +48,7 @@ pub async fn handle_head(
|
||||||
None => return Err(Error::NotFound),
|
None => return Err(Error::NotFound),
|
||||||
};
|
};
|
||||||
|
|
||||||
let body: BodyType = Box::new(BytesBody::from(vec![]));
|
let body: Body = Body::from(vec![]);
|
||||||
let response = object_headers(&version)
|
let response = object_headers(&version)
|
||||||
.status(StatusCode::OK)
|
.status(StatusCode::OK)
|
||||||
.body(body)
|
.body(body)
|
||||||
|
@ -63,7 +61,7 @@ pub async fn handle_get(
|
||||||
req: &Request<Body>,
|
req: &Request<Body>,
|
||||||
bucket: &str,
|
bucket: &str,
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let object = match garage
|
let object = match garage
|
||||||
.object_table
|
.object_table
|
||||||
.get(&bucket.to_string(), &key.to_string())
|
.get(&bucket.to_string(), &key.to_string())
|
||||||
|
@ -111,7 +109,7 @@ pub async fn handle_get(
|
||||||
))),
|
))),
|
||||||
ObjectVersionData::DeleteMarker => Err(Error::NotFound),
|
ObjectVersionData::DeleteMarker => Err(Error::NotFound),
|
||||||
ObjectVersionData::Inline(bytes) => {
|
ObjectVersionData::Inline(bytes) => {
|
||||||
let body: BodyType = Box::new(BytesBody::from(bytes.to_vec()));
|
let body: Body = Body::from(bytes.to_vec());
|
||||||
Ok(resp_builder.body(body)?)
|
Ok(resp_builder.body(body)?)
|
||||||
}
|
}
|
||||||
ObjectVersionData::FirstBlock(first_block_hash) => {
|
ObjectVersionData::FirstBlock(first_block_hash) => {
|
||||||
|
@ -147,7 +145,8 @@ pub async fn handle_get(
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.buffered(2);
|
.buffered(2);
|
||||||
let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream)));
|
//let body: Body = Box::new(StreamBody::new(Box::pin(body_stream)));
|
||||||
|
let body = hyper::body::Body::wrap_stream(body_stream);
|
||||||
Ok(resp_builder.body(body)?)
|
Ok(resp_builder.body(body)?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,7 +157,7 @@ pub async fn handle_get_range(
|
||||||
version: &ObjectVersion,
|
version: &ObjectVersion,
|
||||||
begin: u64,
|
begin: u64,
|
||||||
end: u64,
|
end: u64,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
if end > version.size {
|
if end > version.size {
|
||||||
return Err(Error::BadRequest(format!("Range not included in file")));
|
return Err(Error::BadRequest(format!("Range not included in file")));
|
||||||
}
|
}
|
||||||
|
@ -177,9 +176,9 @@ pub async fn handle_get_range(
|
||||||
ObjectVersionData::DeleteMarker => Err(Error::NotFound),
|
ObjectVersionData::DeleteMarker => Err(Error::NotFound),
|
||||||
ObjectVersionData::Inline(bytes) => {
|
ObjectVersionData::Inline(bytes) => {
|
||||||
if end as usize <= bytes.len() {
|
if end as usize <= bytes.len() {
|
||||||
let body: BodyType = Box::new(BytesBody::from(
|
let body: Body = Body::from(
|
||||||
bytes[begin as usize..end as usize].to_vec(),
|
bytes[begin as usize..end as usize].to_vec(),
|
||||||
));
|
);
|
||||||
Ok(resp_builder.body(body)?)
|
Ok(resp_builder.body(body)?)
|
||||||
} else {
|
} else {
|
||||||
Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been")))
|
Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been")))
|
||||||
|
@ -214,13 +213,14 @@ pub async fn handle_get_range(
|
||||||
} else {
|
} else {
|
||||||
end - block.offset
|
end - block.offset
|
||||||
};
|
};
|
||||||
Ok(Bytes::from(
|
Result::<Bytes,Error>::Ok(Bytes::from(
|
||||||
data[start_in_block as usize..end_in_block as usize].to_vec(),
|
data[start_in_block as usize..end_in_block as usize].to_vec(),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.buffered(2);
|
.buffered(2);
|
||||||
let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream)));
|
//let body: Body = Box::new(StreamBody::new(Box::pin(body_stream)));
|
||||||
|
let body = hyper::body::Body::wrap_stream(body_stream);
|
||||||
Ok(resp_builder.body(body)?)
|
Ok(resp_builder.body(body)?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,14 +3,13 @@ use std::fmt::Write;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use chrono::{DateTime, NaiveDateTime, SecondsFormat, Utc};
|
use chrono::{DateTime, NaiveDateTime, SecondsFormat, Utc};
|
||||||
use hyper::Response;
|
use hyper::{Body, Response};
|
||||||
|
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
use crate::encoding::*;
|
use crate::encoding::*;
|
||||||
use crate::http_util::*;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct ListResultInfo {
|
struct ListResultInfo {
|
||||||
|
@ -26,7 +25,7 @@ pub async fn handle_list(
|
||||||
prefix: &str,
|
prefix: &str,
|
||||||
marker: Option<&str>,
|
marker: Option<&str>,
|
||||||
urlencode_resp: bool,
|
urlencode_resp: bool,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let mut result_keys = BTreeMap::<String, ListResultInfo>::new();
|
let mut result_keys = BTreeMap::<String, ListResultInfo>::new();
|
||||||
let mut result_common_prefixes = BTreeSet::<String>::new();
|
let mut result_common_prefixes = BTreeSet::<String>::new();
|
||||||
|
|
||||||
|
@ -141,5 +140,5 @@ pub async fn handle_list(
|
||||||
writeln!(&mut xml, "</ListBucketResult>").unwrap();
|
writeln!(&mut xml, "</ListBucketResult>").unwrap();
|
||||||
println!("{}", xml);
|
println!("{}", xml);
|
||||||
|
|
||||||
Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes()))))
|
Ok(Response::new(Body::from(xml.into_bytes())))
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,14 +16,13 @@ use garage_model::object_table::*;
|
||||||
use garage_model::version_table::*;
|
use garage_model::version_table::*;
|
||||||
|
|
||||||
use crate::encoding::*;
|
use crate::encoding::*;
|
||||||
use crate::http_util::*;
|
|
||||||
|
|
||||||
pub async fn handle_put(
|
pub async fn handle_put(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
bucket: &str,
|
bucket: &str,
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let version_uuid = gen_uuid();
|
let version_uuid = gen_uuid();
|
||||||
let mime_type = get_mime_type(&req)?;
|
let mime_type = get_mime_type(&req)?;
|
||||||
let body = req.into_body();
|
let body = req.into_body();
|
||||||
|
@ -195,10 +194,10 @@ impl BodyChunker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn put_response(version_uuid: UUID) -> Response<BodyType> {
|
pub fn put_response(version_uuid: UUID) -> Response<Body> {
|
||||||
Response::builder()
|
Response::builder()
|
||||||
.header("x-amz-version-id", hex::encode(version_uuid))
|
.header("x-amz-version-id", hex::encode(version_uuid))
|
||||||
.body(empty_body())
|
.body(Body::from(vec![]))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,7 +206,7 @@ pub async fn handle_create_multipart_upload(
|
||||||
req: &Request<Body>,
|
req: &Request<Body>,
|
||||||
bucket: &str,
|
bucket: &str,
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let version_uuid = gen_uuid();
|
let version_uuid = gen_uuid();
|
||||||
let mime_type = get_mime_type(req)?;
|
let mime_type = get_mime_type(req)?;
|
||||||
|
|
||||||
|
@ -239,7 +238,7 @@ pub async fn handle_create_multipart_upload(
|
||||||
.unwrap();
|
.unwrap();
|
||||||
writeln!(&mut xml, "</InitiateMultipartUploadResult>").unwrap();
|
writeln!(&mut xml, "</InitiateMultipartUploadResult>").unwrap();
|
||||||
|
|
||||||
Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes()))))
|
Ok(Response::new(Body::from(xml.into_bytes())))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_put_part(
|
pub async fn handle_put_part(
|
||||||
|
@ -249,7 +248,7 @@ pub async fn handle_put_part(
|
||||||
key: &str,
|
key: &str,
|
||||||
part_number_str: &str,
|
part_number_str: &str,
|
||||||
upload_id: &str,
|
upload_id: &str,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
// Check parameters
|
// Check parameters
|
||||||
let part_number = part_number_str
|
let part_number = part_number_str
|
||||||
.parse::<u64>()
|
.parse::<u64>()
|
||||||
|
@ -299,7 +298,7 @@ pub async fn handle_put_part(
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(Response::new(Box::new(BytesBody::from(vec![]))))
|
Ok(Response::new(Body::from(vec![])))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_complete_multipart_upload(
|
pub async fn handle_complete_multipart_upload(
|
||||||
|
@ -308,7 +307,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
bucket: &str,
|
bucket: &str,
|
||||||
key: &str,
|
key: &str,
|
||||||
upload_id: &str,
|
upload_id: &str,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let version_uuid =
|
let version_uuid =
|
||||||
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
|
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
|
||||||
|
|
||||||
|
@ -374,7 +373,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
writeln!(&mut xml, "\t<Key>{}</Key>", xml_escape(&key)).unwrap();
|
writeln!(&mut xml, "\t<Key>{}</Key>", xml_escape(&key)).unwrap();
|
||||||
writeln!(&mut xml, "</CompleteMultipartUploadResult>").unwrap();
|
writeln!(&mut xml, "</CompleteMultipartUploadResult>").unwrap();
|
||||||
|
|
||||||
Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes()))))
|
Ok(Response::new(Body::from(xml.into_bytes())))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_abort_multipart_upload(
|
pub async fn handle_abort_multipart_upload(
|
||||||
|
@ -382,7 +381,7 @@ pub async fn handle_abort_multipart_upload(
|
||||||
bucket: &str,
|
bucket: &str,
|
||||||
key: &str,
|
key: &str,
|
||||||
upload_id: &str,
|
upload_id: &str,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let version_uuid =
|
let version_uuid =
|
||||||
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
|
uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
|
||||||
|
|
||||||
|
@ -412,7 +411,7 @@ pub async fn handle_abort_multipart_upload(
|
||||||
let final_object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
|
let final_object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
|
||||||
garage.object_table.insert(&final_object).await?;
|
garage.object_table.insert(&final_object).await?;
|
||||||
|
|
||||||
Ok(Response::new(Box::new(BytesBody::from(vec![]))))
|
Ok(Response::new(Body::from(vec![])))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
|
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
|
||||||
|
|
Loading…
Reference in a new issue