forked from Deuxfleurs/garage
Implement HTTP ranges in get
This commit is contained in:
parent
16fbb32fd3
commit
b46a7788d1
7 changed files with 229 additions and 99 deletions
7
Cargo.lock
generated
7
Cargo.lock
generated
|
@ -371,6 +371,7 @@ dependencies = [
|
||||||
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
"hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"http-range 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"httpdate 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
"httpdate 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"hyper 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
"hyper 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
@ -568,6 +569,11 @@ dependencies = [
|
||||||
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "http-range"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "httparse"
|
name = "httparse"
|
||||||
version = "1.3.4"
|
version = "1.3.4"
|
||||||
|
@ -1556,6 +1562,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
"checksum hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5dcb5e64cda4c23119ab41ba960d1e170a774c8e4b9d9e6a9bc18aabf5e59695"
|
"checksum hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5dcb5e64cda4c23119ab41ba960d1e170a774c8e4b9d9e6a9bc18aabf5e59695"
|
||||||
"checksum http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9"
|
"checksum http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9"
|
||||||
"checksum http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b"
|
"checksum http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b"
|
||||||
|
"checksum http-range 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5f2e4003e6fd05ea9109db00415e670b11f511a42e567ff2d5d771cbdfa24e02"
|
||||||
"checksum httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"
|
"checksum httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"
|
||||||
"checksum httpdate 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47"
|
"checksum httpdate 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47"
|
||||||
"checksum humantime 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
|
"checksum humantime 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
|
||||||
|
|
|
@ -32,3 +32,4 @@ url = "2.1"
|
||||||
httpdate = "0.3"
|
httpdate = "0.3"
|
||||||
percent-encoding = "2.1.0"
|
percent-encoding = "2.1.0"
|
||||||
roxmltree = "0.11"
|
roxmltree = "0.11"
|
||||||
|
http-range = "0.1"
|
||||||
|
|
|
@ -73,8 +73,7 @@ async fn handler_inner(
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<BodyType>, Error> {
|
||||||
let path = req.uri().path().to_string();
|
let path = req.uri().path().to_string();
|
||||||
let path = percent_encoding::percent_decode_str(&path)
|
let path = percent_encoding::percent_decode_str(&path).decode_utf8()?;
|
||||||
.decode_utf8()?;
|
|
||||||
|
|
||||||
let (bucket, key) = parse_bucket_key(&path)?;
|
let (bucket, key) = parse_bucket_key(&path)?;
|
||||||
if bucket.len() == 0 {
|
if bucket.len() == 0 {
|
||||||
|
@ -110,7 +109,7 @@ async fn handler_inner(
|
||||||
}
|
}
|
||||||
&Method::GET => {
|
&Method::GET => {
|
||||||
// GetObject query
|
// GetObject query
|
||||||
Ok(handle_get(garage, &bucket, &key).await?)
|
Ok(handle_get(garage, &req, &bucket, &key).await?)
|
||||||
}
|
}
|
||||||
&Method::PUT => {
|
&Method::PUT => {
|
||||||
if params.contains_key(&"partnumber".to_string())
|
if params.contains_key(&"partnumber".to_string())
|
||||||
|
@ -123,8 +122,8 @@ async fn handler_inner(
|
||||||
} else if req.headers().contains_key("x-amz-copy-source") {
|
} else if req.headers().contains_key("x-amz-copy-source") {
|
||||||
// CopyObject query
|
// CopyObject query
|
||||||
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
|
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
|
||||||
let copy_source = percent_encoding::percent_decode_str(©_source)
|
let copy_source =
|
||||||
.decode_utf8()?;
|
percent_encoding::percent_decode_str(©_source).decode_utf8()?;
|
||||||
let (source_bucket, source_key) = parse_bucket_key(©_source)?;
|
let (source_bucket, source_key) = parse_bucket_key(©_source)?;
|
||||||
if !api_key.allow_read(&source_bucket) {
|
if !api_key.allow_read(&source_bucket) {
|
||||||
return Err(Error::Forbidden(format!(
|
return Err(Error::Forbidden(format!(
|
||||||
|
@ -241,9 +240,7 @@ async fn handler_inner(
|
||||||
Err(Error::BadRequest(format!("Unsupported call")))
|
Err(Error::BadRequest(format!("Unsupported call")))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
_ => Err(Error::BadRequest(format!("Invalid method"))),
|
||||||
Err(Error::BadRequest(format!("Invalid method")))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use std::sync::Arc;
|
|
||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use hyper::{Body, Request, Response};
|
use hyper::{Body, Request, Response};
|
||||||
|
|
||||||
|
@ -9,10 +9,14 @@ use garage_util::error::Error;
|
||||||
use garage_core::garage::Garage;
|
use garage_core::garage::Garage;
|
||||||
use garage_core::object_table::*;
|
use garage_core::object_table::*;
|
||||||
|
|
||||||
use crate::http_util::*;
|
|
||||||
use crate::encoding::*;
|
use crate::encoding::*;
|
||||||
|
use crate::http_util::*;
|
||||||
|
|
||||||
async fn handle_delete_internal(garage: &Garage, bucket: &str, key: &str) -> Result<(UUID, UUID), Error> {
|
async fn handle_delete_internal(
|
||||||
|
garage: &Garage,
|
||||||
|
bucket: &str,
|
||||||
|
key: &str,
|
||||||
|
) -> Result<(UUID, UUID), Error> {
|
||||||
let object = match garage
|
let object = match garage
|
||||||
.object_table
|
.object_table
|
||||||
.get(&bucket.to_string(), &key.to_string())
|
.get(&bucket.to_string(), &key.to_string())
|
||||||
|
@ -62,8 +66,13 @@ async fn handle_delete_internal(garage: &Garage, bucket: &str, key: &str) -> Res
|
||||||
return Ok((deleted_version, version_uuid));
|
return Ok((deleted_version, version_uuid));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<Response<BodyType>, Error> {
|
pub async fn handle_delete(
|
||||||
let (_deleted_version, delete_marker_version) = handle_delete_internal(&garage, bucket, key).await?;
|
garage: Arc<Garage>,
|
||||||
|
bucket: &str,
|
||||||
|
key: &str,
|
||||||
|
) -> Result<Response<BodyType>, Error> {
|
||||||
|
let (_deleted_version, delete_marker_version) =
|
||||||
|
handle_delete_internal(&garage, bucket, key).await?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.header("x-amz-version-id", hex::encode(delete_marker_version))
|
.header("x-amz-version-id", hex::encode(delete_marker_version))
|
||||||
|
@ -71,7 +80,11 @@ pub async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Resu
|
||||||
.unwrap())
|
.unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete_objects(garage: Arc<Garage>, bucket: &str, req: Request<Body>) -> Result<Response<BodyType>, Error> {
|
pub async fn handle_delete_objects(
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
bucket: &str,
|
||||||
|
req: Request<Body>,
|
||||||
|
) -> Result<Response<BodyType>, Error> {
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?;
|
let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?;
|
||||||
let cmd = parse_delete_objects_xml(&cmd_xml)
|
let cmd = parse_delete_objects_xml(&cmd_xml)
|
||||||
|
@ -86,15 +99,30 @@ pub async fn handle_delete_objects(garage: Arc<Garage>, bucket: &str, req: Reque
|
||||||
Ok((deleted_version, delete_marker_version)) => {
|
Ok((deleted_version, delete_marker_version)) => {
|
||||||
writeln!(&mut retxml, "\t<Deleted>").unwrap();
|
writeln!(&mut retxml, "\t<Deleted>").unwrap();
|
||||||
writeln!(&mut retxml, "\t\t<Key>{}</Key>", obj.key).unwrap();
|
writeln!(&mut retxml, "\t\t<Key>{}</Key>", obj.key).unwrap();
|
||||||
writeln!(&mut retxml, "\t\t<VersionId>{}</VersionId>", hex::encode(deleted_version)).unwrap();
|
writeln!(
|
||||||
writeln!(&mut retxml, "\t\t<DeleteMarkerVersionId>{}</DeleteMarkerVersionId>", hex::encode(delete_marker_version)).unwrap();
|
&mut retxml,
|
||||||
|
"\t\t<VersionId>{}</VersionId>",
|
||||||
|
hex::encode(deleted_version)
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
writeln!(
|
||||||
|
&mut retxml,
|
||||||
|
"\t\t<DeleteMarkerVersionId>{}</DeleteMarkerVersionId>",
|
||||||
|
hex::encode(delete_marker_version)
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
writeln!(&mut retxml, "\t</Deleted>").unwrap();
|
writeln!(&mut retxml, "\t</Deleted>").unwrap();
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
writeln!(&mut retxml, "\t<Error>").unwrap();
|
writeln!(&mut retxml, "\t<Error>").unwrap();
|
||||||
writeln!(&mut retxml, "\t\t<Code>{}</Code>", e.http_status_code()).unwrap();
|
writeln!(&mut retxml, "\t\t<Code>{}</Code>", e.http_status_code()).unwrap();
|
||||||
writeln!(&mut retxml, "\t\t<Key>{}</Key>", obj.key).unwrap();
|
writeln!(&mut retxml, "\t\t<Key>{}</Key>", obj.key).unwrap();
|
||||||
writeln!(&mut retxml, "\t\t<Message>{}</Message>", xml_escape(&format!("{}", e))).unwrap();
|
writeln!(
|
||||||
|
&mut retxml,
|
||||||
|
"\t\t<Message>{}</Message>",
|
||||||
|
xml_escape(&format!("{}", e))
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
writeln!(&mut retxml, "\t</Error>").unwrap();
|
writeln!(&mut retxml, "\t</Error>").unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -102,7 +130,9 @@ pub async fn handle_delete_objects(garage: Arc<Garage>, bucket: &str, req: Reque
|
||||||
|
|
||||||
writeln!(&mut retxml, "</DeleteObjectsOutput>").unwrap();
|
writeln!(&mut retxml, "</DeleteObjectsOutput>").unwrap();
|
||||||
|
|
||||||
Ok(Response::new(Box::new(BytesBody::from(retxml.into_bytes()))))
|
Ok(Response::new(Box::new(BytesBody::from(
|
||||||
|
retxml.into_bytes(),
|
||||||
|
))))
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DeleteRequest {
|
struct DeleteRequest {
|
||||||
|
@ -129,7 +159,9 @@ fn parse_delete_objects_xml(xml: &roxmltree::Document) -> Result<DeleteRequest,
|
||||||
if item.has_tag_name("Object") {
|
if item.has_tag_name("Object") {
|
||||||
if let Some(key) = item.children().find(|e| e.has_tag_name("Key")) {
|
if let Some(key) = item.children().find(|e| e.has_tag_name("Key")) {
|
||||||
if let Some(key_str) = key.text() {
|
if let Some(key_str) = key.text() {
|
||||||
ret.objects.push(DeleteObject{key: key_str.to_string()});
|
ret.objects.push(DeleteObject {
|
||||||
|
key: key_str.to_string(),
|
||||||
|
});
|
||||||
} else {
|
} else {
|
||||||
return Err(format!("No text for key: {:?}", key));
|
return Err(format!("No text for key: {:?}", key));
|
||||||
}
|
}
|
||||||
|
@ -143,4 +175,3 @@ fn parse_delete_objects_xml(xml: &roxmltree::Document) -> Result<DeleteRequest,
|
||||||
|
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ use std::time::{Duration, UNIX_EPOCH};
|
||||||
|
|
||||||
use futures::stream::*;
|
use futures::stream::*;
|
||||||
use hyper::body::Bytes;
|
use hyper::body::Bytes;
|
||||||
use hyper::{Response, StatusCode};
|
use hyper::{Body, Request, Response, StatusCode};
|
||||||
|
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ fn object_headers(version: &ObjectVersion) -> http::response::Builder {
|
||||||
.header("Content-Type", version.mime_type.to_string())
|
.header("Content-Type", version.mime_type.to_string())
|
||||||
.header("Content-Length", format!("{}", version.size))
|
.header("Content-Length", format!("{}", version.size))
|
||||||
.header("Last-Modified", date_str)
|
.header("Last-Modified", date_str)
|
||||||
|
.header("Accept-Ranges", format!("bytes"))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_head(
|
pub async fn handle_head(
|
||||||
|
@ -59,6 +60,7 @@ pub async fn handle_head(
|
||||||
|
|
||||||
pub async fn handle_get(
|
pub async fn handle_get(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
|
req: &Request<Body>,
|
||||||
bucket: &str,
|
bucket: &str,
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Response<BodyType>, Error> {
|
) -> Result<Response<BodyType>, Error> {
|
||||||
|
@ -82,6 +84,25 @@ pub async fn handle_get(
|
||||||
None => return Err(Error::NotFound),
|
None => return Err(Error::NotFound),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let range = match req.headers().get("range") {
|
||||||
|
Some(range) => {
|
||||||
|
let range_str = range
|
||||||
|
.to_str()
|
||||||
|
.map_err(|e| Error::BadRequest(format!("Invalid range header: {}", e)))?;
|
||||||
|
let mut ranges = http_range::HttpRange::parse(range_str, last_v.size)
|
||||||
|
.map_err(|_e| Error::BadRequest(format!("Invalid range")))?;
|
||||||
|
if ranges.len() > 1 {
|
||||||
|
return Err(Error::BadRequest(format!("Multiple ranges not supported")));
|
||||||
|
} else {
|
||||||
|
ranges.pop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
if let Some(range) = range {
|
||||||
|
return handle_get_range(garage, last_v, range.start, range.start + range.length).await;
|
||||||
|
}
|
||||||
|
|
||||||
let resp_builder = object_headers(&last_v).status(StatusCode::OK);
|
let resp_builder = object_headers(&last_v).status(StatusCode::OK);
|
||||||
|
|
||||||
match &last_v.data {
|
match &last_v.data {
|
||||||
|
@ -131,3 +152,76 @@ pub async fn handle_get(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn handle_get_range(
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
version: &ObjectVersion,
|
||||||
|
begin: u64,
|
||||||
|
end: u64,
|
||||||
|
) -> Result<Response<BodyType>, Error> {
|
||||||
|
if end > version.size {
|
||||||
|
return Err(Error::BadRequest(format!("Range not included in file")));
|
||||||
|
}
|
||||||
|
|
||||||
|
let resp_builder = object_headers(&version)
|
||||||
|
.header(
|
||||||
|
"Content-Range",
|
||||||
|
format!("bytes {}-{}/{}", begin, end, version.size),
|
||||||
|
)
|
||||||
|
.status(StatusCode::PARTIAL_CONTENT);
|
||||||
|
|
||||||
|
match &version.data {
|
||||||
|
ObjectVersionData::Uploading => Err(Error::Message(format!(
|
||||||
|
"Version is_complete() but data is stil Uploading (internal error)"
|
||||||
|
))),
|
||||||
|
ObjectVersionData::DeleteMarker => Err(Error::NotFound),
|
||||||
|
ObjectVersionData::Inline(bytes) => {
|
||||||
|
if end as usize <= bytes.len() {
|
||||||
|
let body: BodyType = Box::new(BytesBody::from(
|
||||||
|
bytes[begin as usize..end as usize].to_vec(),
|
||||||
|
));
|
||||||
|
Ok(resp_builder.body(body)?)
|
||||||
|
} else {
|
||||||
|
Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ObjectVersionData::FirstBlock(_first_block_hash) => {
|
||||||
|
let version = garage.version_table.get(&version.uuid, &EmptyKey).await?;
|
||||||
|
let version = match version {
|
||||||
|
Some(v) => v,
|
||||||
|
None => return Err(Error::NotFound),
|
||||||
|
};
|
||||||
|
|
||||||
|
let blocks = version
|
||||||
|
.blocks()
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.filter(|block| block.offset + block.size > begin && block.offset < end)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let body_stream = futures::stream::iter(blocks)
|
||||||
|
.map(move |block| {
|
||||||
|
let garage = garage.clone();
|
||||||
|
async move {
|
||||||
|
let data = garage.block_manager.rpc_get_block(&block.hash).await?;
|
||||||
|
let start_in_block = if block.offset > begin {
|
||||||
|
0
|
||||||
|
} else {
|
||||||
|
begin - block.offset
|
||||||
|
};
|
||||||
|
let end_in_block = if block.offset + block.size < end {
|
||||||
|
block.size
|
||||||
|
} else {
|
||||||
|
end - block.offset
|
||||||
|
};
|
||||||
|
Ok(Bytes::from(
|
||||||
|
data[start_in_block as usize..end_in_block as usize].to_vec(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.buffered(2);
|
||||||
|
let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream)));
|
||||||
|
Ok(resp_builder.body(body)?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue