diff --git a/Cargo.lock b/Cargo.lock
index 97e0522..c389056 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -371,6 +371,7 @@ dependencies = [
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "http-range 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"httpdate 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.13.5 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -568,6 +569,11 @@ dependencies = [
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
+[[package]]
+name = "http-range"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
[[package]]
name = "httparse"
version = "1.3.4"
@@ -1556,6 +1562,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5dcb5e64cda4c23119ab41ba960d1e170a774c8e4b9d9e6a9bc18aabf5e59695"
"checksum http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9"
"checksum http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b"
+"checksum http-range 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5f2e4003e6fd05ea9109db00415e670b11f511a42e567ff2d5d771cbdfa24e02"
"checksum httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"
"checksum httpdate 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47"
"checksum humantime 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index b8674fb..56e0e2a 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -32,3 +32,4 @@ url = "2.1"
httpdate = "0.3"
percent-encoding = "2.1.0"
roxmltree = "0.11"
+http-range = "0.1"
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index 913ff0a..32506cc 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -73,8 +73,7 @@ async fn handler_inner(
req: Request
,
) -> Result, Error> {
let path = req.uri().path().to_string();
- let path = percent_encoding::percent_decode_str(&path)
- .decode_utf8()?;
+ let path = percent_encoding::percent_decode_str(&path).decode_utf8()?;
let (bucket, key) = parse_bucket_key(&path)?;
if bucket.len() == 0 {
@@ -110,7 +109,7 @@ async fn handler_inner(
}
&Method::GET => {
// GetObject query
- Ok(handle_get(garage, &bucket, &key).await?)
+ Ok(handle_get(garage, &req, &bucket, &key).await?)
}
&Method::PUT => {
if params.contains_key(&"partnumber".to_string())
@@ -123,8 +122,8 @@ async fn handler_inner(
} else if req.headers().contains_key("x-amz-copy-source") {
// CopyObject query
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
- let copy_source = percent_encoding::percent_decode_str(©_source)
- .decode_utf8()?;
+ let copy_source =
+ percent_encoding::percent_decode_str(©_source).decode_utf8()?;
let (source_bucket, source_key) = parse_bucket_key(©_source)?;
if !api_key.allow_read(&source_bucket) {
return Err(Error::Forbidden(format!(
@@ -228,22 +227,20 @@ async fn handler_inner(
)
.await?)
}
- &Method::POST => {
- if params.contains_key(&"delete".to_string()) {
- // DeleteObjects
- Ok(handle_delete_objects(garage, bucket, req).await?)
- } else {
- println!(
- "Body: {}",
- std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
- .unwrap_or("")
- );
- Err(Error::BadRequest(format!("Unsupported call")))
- }
- }
- _ => {
- Err(Error::BadRequest(format!("Invalid method")))
- }
+ &Method::POST => {
+ if params.contains_key(&"delete".to_string()) {
+ // DeleteObjects
+ Ok(handle_delete_objects(garage, bucket, req).await?)
+ } else {
+ println!(
+ "Body: {}",
+ std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
+ .unwrap_or("")
+ );
+ Err(Error::BadRequest(format!("Unsupported call")))
+ }
+ }
+ _ => Err(Error::BadRequest(format!("Invalid method"))),
}
}
}
diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs
index 001eb16..e77ab31 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3_delete.rs
@@ -1,5 +1,5 @@
-use std::sync::Arc;
use std::fmt::Write;
+use std::sync::Arc;
use hyper::{Body, Request, Response};
@@ -9,10 +9,14 @@ use garage_util::error::Error;
use garage_core::garage::Garage;
use garage_core::object_table::*;
-use crate::http_util::*;
use crate::encoding::*;
+use crate::http_util::*;
-async fn handle_delete_internal(garage: &Garage, bucket: &str, key: &str) -> Result<(UUID, UUID), Error> {
+async fn handle_delete_internal(
+ garage: &Garage,
+ bucket: &str,
+ key: &str,
+) -> Result<(UUID, UUID), Error> {
let object = match garage
.object_table
.get(&bucket.to_string(), &key.to_string())
@@ -32,16 +36,16 @@ async fn handle_delete_internal(garage: &Garage, bucket: &str, key: &str) -> Res
let mut must_delete = None;
let mut timestamp = now_msec();
for v in interesting_versions {
- if v.timestamp + 1 > timestamp || must_delete.is_none() {
- must_delete = Some(v.uuid);
- }
+ if v.timestamp + 1 > timestamp || must_delete.is_none() {
+ must_delete = Some(v.uuid);
+ }
timestamp = std::cmp::max(timestamp, v.timestamp + 1);
}
- let deleted_version = match must_delete {
- None => return Err(Error::NotFound),
- Some(v) => v,
- };
+ let deleted_version = match must_delete {
+ None => return Err(Error::NotFound),
+ Some(v) => v,
+ };
let version_uuid = gen_uuid();
@@ -62,8 +66,13 @@ async fn handle_delete_internal(garage: &Garage, bucket: &str, key: &str) -> Res
return Ok((deleted_version, version_uuid));
}
-pub async fn handle_delete(garage: Arc, bucket: &str, key: &str) -> Result, Error> {
- let (_deleted_version, delete_marker_version) = handle_delete_internal(&garage, bucket, key).await?;
+pub async fn handle_delete(
+ garage: Arc,
+ bucket: &str,
+ key: &str,
+) -> Result, Error> {
+ let (_deleted_version, delete_marker_version) =
+ handle_delete_internal(&garage, bucket, key).await?;
Ok(Response::builder()
.header("x-amz-version-id", hex::encode(delete_marker_version))
@@ -71,76 +80,98 @@ pub async fn handle_delete(garage: Arc, bucket: &str, key: &str) -> Resu
.unwrap())
}
-pub async fn handle_delete_objects(garage: Arc, bucket: &str, req: Request) -> Result, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?;
- let cmd = parse_delete_objects_xml(&cmd_xml)
- .map_err(|e| Error::BadRequest(format!("Invald delete XML query: {}", e)))?;
+pub async fn handle_delete_objects(
+ garage: Arc,
+ bucket: &str,
+ req: Request,
+) -> Result, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?;
+ let cmd = parse_delete_objects_xml(&cmd_xml)
+ .map_err(|e| Error::BadRequest(format!("Invald delete XML query: {}", e)))?;
- let mut retxml = String::new();
+ let mut retxml = String::new();
writeln!(&mut retxml, r#""#).unwrap();
writeln!(&mut retxml, "").unwrap();
- for obj in cmd.objects.iter() {
- match handle_delete_internal(&garage, bucket, &obj.key).await {
- Ok((deleted_version, delete_marker_version)) => {
- writeln!(&mut retxml, "\t").unwrap();
- writeln!(&mut retxml, "\t\t{}", obj.key).unwrap();
- writeln!(&mut retxml, "\t\t{}", hex::encode(deleted_version)).unwrap();
- writeln!(&mut retxml, "\t\t{}", hex::encode(delete_marker_version)).unwrap();
- writeln!(&mut retxml, "\t").unwrap();
- }
- Err(e) => {
- writeln!(&mut retxml, "\t").unwrap();
- writeln!(&mut retxml, "\t\t{}
", e.http_status_code()).unwrap();
- writeln!(&mut retxml, "\t\t{}", obj.key).unwrap();
- writeln!(&mut retxml, "\t\t{}", xml_escape(&format!("{}", e))).unwrap();
- writeln!(&mut retxml, "\t").unwrap();
- }
- }
- }
+ for obj in cmd.objects.iter() {
+ match handle_delete_internal(&garage, bucket, &obj.key).await {
+ Ok((deleted_version, delete_marker_version)) => {
+ writeln!(&mut retxml, "\t").unwrap();
+ writeln!(&mut retxml, "\t\t{}", obj.key).unwrap();
+ writeln!(
+ &mut retxml,
+ "\t\t{}",
+ hex::encode(deleted_version)
+ )
+ .unwrap();
+ writeln!(
+ &mut retxml,
+ "\t\t{}",
+ hex::encode(delete_marker_version)
+ )
+ .unwrap();
+ writeln!(&mut retxml, "\t").unwrap();
+ }
+ Err(e) => {
+ writeln!(&mut retxml, "\t").unwrap();
+ writeln!(&mut retxml, "\t\t{}
", e.http_status_code()).unwrap();
+ writeln!(&mut retxml, "\t\t{}", obj.key).unwrap();
+ writeln!(
+ &mut retxml,
+ "\t\t{}",
+ xml_escape(&format!("{}", e))
+ )
+ .unwrap();
+ writeln!(&mut retxml, "\t").unwrap();
+ }
+ }
+ }
writeln!(&mut retxml, "").unwrap();
- Ok(Response::new(Box::new(BytesBody::from(retxml.into_bytes()))))
+ Ok(Response::new(Box::new(BytesBody::from(
+ retxml.into_bytes(),
+ ))))
}
struct DeleteRequest {
- objects: Vec,
+ objects: Vec,
}
struct DeleteObject {
- key: String,
+ key: String,
}
fn parse_delete_objects_xml(xml: &roxmltree::Document) -> Result {
- let mut ret = DeleteRequest{objects: vec![]};
+ let mut ret = DeleteRequest { objects: vec![] };
- let root = xml.root();
- let delete = match root.first_child() {
- Some(del) => del,
- None => return Err(format!("Delete tag not found")),
- };
- if !delete.has_tag_name("Delete") {
- return Err(format!("Invalid root tag: {:?}", root));
- }
+ let root = xml.root();
+ let delete = match root.first_child() {
+ Some(del) => del,
+ None => return Err(format!("Delete tag not found")),
+ };
+ if !delete.has_tag_name("Delete") {
+ return Err(format!("Invalid root tag: {:?}", root));
+ }
- for item in delete.children() {
- if item.has_tag_name("Object") {
- if let Some(key) = item.children().find(|e| e.has_tag_name("Key")) {
- if let Some(key_str) = key.text() {
- ret.objects.push(DeleteObject{key: key_str.to_string()});
- } else {
- return Err(format!("No text for key: {:?}", key));
- }
- } else {
- return Err(format!("No delete key for item: {:?}", item));
- }
- } else {
- return Err(format!("Invalid delete item: {:?}", item));
- }
- }
+ for item in delete.children() {
+ if item.has_tag_name("Object") {
+ if let Some(key) = item.children().find(|e| e.has_tag_name("Key")) {
+ if let Some(key_str) = key.text() {
+ ret.objects.push(DeleteObject {
+ key: key_str.to_string(),
+ });
+ } else {
+ return Err(format!("No text for key: {:?}", key));
+ }
+ } else {
+ return Err(format!("No delete key for item: {:?}", item));
+ }
+ } else {
+ return Err(format!("Invalid delete item: {:?}", item));
+ }
+ }
- Ok(ret)
+ Ok(ret)
}
-
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index 75478e4..3ed0f91 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -3,7 +3,7 @@ use std::time::{Duration, UNIX_EPOCH};
use futures::stream::*;
use hyper::body::Bytes;
-use hyper::{Response, StatusCode};
+use hyper::{Body, Request, Response, StatusCode};
use garage_util::error::Error;
@@ -22,6 +22,7 @@ fn object_headers(version: &ObjectVersion) -> http::response::Builder {
.header("Content-Type", version.mime_type.to_string())
.header("Content-Length", format!("{}", version.size))
.header("Last-Modified", date_str)
+ .header("Accept-Ranges", format!("bytes"))
}
pub async fn handle_head(
@@ -59,6 +60,7 @@ pub async fn handle_head(
pub async fn handle_get(
garage: Arc,
+ req: &Request,
bucket: &str,
key: &str,
) -> Result, Error> {
@@ -82,6 +84,25 @@ pub async fn handle_get(
None => return Err(Error::NotFound),
};
+ let range = match req.headers().get("range") {
+ Some(range) => {
+ let range_str = range
+ .to_str()
+ .map_err(|e| Error::BadRequest(format!("Invalid range header: {}", e)))?;
+ let mut ranges = http_range::HttpRange::parse(range_str, last_v.size)
+ .map_err(|_e| Error::BadRequest(format!("Invalid range")))?;
+ if ranges.len() > 1 {
+ return Err(Error::BadRequest(format!("Multiple ranges not supported")));
+ } else {
+ ranges.pop()
+ }
+ }
+ None => None,
+ };
+ if let Some(range) = range {
+ return handle_get_range(garage, last_v, range.start, range.start + range.length).await;
+ }
+
let resp_builder = object_headers(&last_v).status(StatusCode::OK);
match &last_v.data {
@@ -131,3 +152,76 @@ pub async fn handle_get(
}
}
}
+
+pub async fn handle_get_range(
+ garage: Arc,
+ version: &ObjectVersion,
+ begin: u64,
+ end: u64,
+) -> Result, Error> {
+ if end > version.size {
+ return Err(Error::BadRequest(format!("Range not included in file")));
+ }
+
+ let resp_builder = object_headers(&version)
+ .header(
+ "Content-Range",
+ format!("bytes {}-{}/{}", begin, end, version.size),
+ )
+ .status(StatusCode::PARTIAL_CONTENT);
+
+ match &version.data {
+ ObjectVersionData::Uploading => Err(Error::Message(format!(
+ "Version is_complete() but data is stil Uploading (internal error)"
+ ))),
+ ObjectVersionData::DeleteMarker => Err(Error::NotFound),
+ ObjectVersionData::Inline(bytes) => {
+ if end as usize <= bytes.len() {
+ let body: BodyType = Box::new(BytesBody::from(
+ bytes[begin as usize..end as usize].to_vec(),
+ ));
+ Ok(resp_builder.body(body)?)
+ } else {
+ Err(Error::Message(format!("Internal error: requested range not present in inline bytes when it should have been")))
+ }
+ }
+ ObjectVersionData::FirstBlock(_first_block_hash) => {
+ let version = garage.version_table.get(&version.uuid, &EmptyKey).await?;
+ let version = match version {
+ Some(v) => v,
+ None => return Err(Error::NotFound),
+ };
+
+ let blocks = version
+ .blocks()
+ .iter()
+ .cloned()
+ .filter(|block| block.offset + block.size > begin && block.offset < end)
+ .collect::>();
+
+ let body_stream = futures::stream::iter(blocks)
+ .map(move |block| {
+ let garage = garage.clone();
+ async move {
+ let data = garage.block_manager.rpc_get_block(&block.hash).await?;
+ let start_in_block = if block.offset > begin {
+ 0
+ } else {
+ begin - block.offset
+ };
+ let end_in_block = if block.offset + block.size < end {
+ block.size
+ } else {
+ end - block.offset
+ };
+ Ok(Bytes::from(
+ data[start_in_block as usize..end_in_block as usize].to_vec(),
+ ))
+ }
+ })
+ .buffered(2);
+ let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream)));
+ Ok(resp_builder.body(body)?)
+ }
+ }
+}
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
index d4d8161..b8babbb 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3_list.rs
@@ -58,10 +58,10 @@ pub async fn handle_list(
break 'query_loop;
}
if let Some(version) = object.versions().iter().find(|x| x.is_data()) {
- if result_keys.len() + result_common_prefixes.len() >= max_keys {
- truncated = true;
- break 'query_loop;
- }
+ if result_keys.len() + result_common_prefixes.len() >= max_keys {
+ truncated = true;
+ break 'query_loop;
+ }
let common_prefix = if delimiter.len() > 0 {
let relative_key = &object.key[prefix.len()..];
match relative_key.find(delimiter) {
@@ -88,8 +88,8 @@ pub async fn handle_list(
}
}
if objects.len() < max_keys + 1 {
- truncated = false;
- break 'query_loop;
+ truncated = false;
+ break 'query_loop;
}
if objects.len() > 0 {
next_chunk_start = objects[objects.len() - 1].key.clone();
diff --git a/src/util/error.rs b/src/util/error.rs
index d2ed1cc..0ca1afe 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -116,13 +116,13 @@ impl From> for Error {
}
impl From for Error {
- fn from(e: std::str::Utf8Error) -> Error {
- Error::BadRequest(format!("Invalid UTF-8: {}", e))
- }
+ fn from(e: std::str::Utf8Error) -> Error {
+ Error::BadRequest(format!("Invalid UTF-8: {}", e))
+ }
}
impl From for Error {
- fn from(e: roxmltree::Error) -> Error {
- Error::BadRequest(format!("Invalid XML: {}", e))
- }
+ fn from(e: roxmltree::Error) -> Error {
+ Error::BadRequest(format!("Invalid XML: {}", e))
+ }
}