Implement part_number for GetObject

This commit is contained in:
Alex 2022-01-13 15:29:39 +01:00
parent 6dab836f3a
commit 338b1b83ee
No known key found for this signature in database
GPG key ID: EDABF9711E244EB1

View file

@ -5,7 +5,7 @@ use std::time::{Duration, UNIX_EPOCH};
use futures::stream::*; use futures::stream::*;
use http::header::{ use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED, IF_NONE_MATCH, LAST_MODIFIED, RANGE,
}; };
use hyper::body::Bytes; use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
@ -15,6 +15,7 @@ use garage_util::data::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::object_table::*; use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::error::*; use crate::error::*;
@ -168,12 +169,6 @@ pub async fn handle_get(
key: &str, key: &str,
part_number: Option<u64>, part_number: Option<u64>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
if part_number.is_some() {
return Err(Error::NotImplemented(
"part_number not supported for GetObject".into(),
));
}
let object = garage let object = garage
.object_table .object_table
.get(&bucket_id, &key.to_string()) .get(&bucket_id, &key.to_string())
@ -201,22 +196,13 @@ pub async fn handle_get(
return Ok(cached); return Ok(cached);
} }
let range = match req.headers().get("range") { if let Some(pn) = part_number {
Some(range) => { return handle_get_part(garage, req, last_v, last_v_data, last_v_meta, pn).await;
let range_str = range.to_str()?; }
let mut ranges = http_range::HttpRange::parse(range_str, last_v_meta.size)
.map_err(|e| (e, last_v_meta.size))?; // No part_number specified, it's a normal get object
if ranges.len() > 1 {
// garage does not support multi-range requests yet, so we respond with the entire if let Some(range) = parse_range_header(req, last_v_meta.size)? {
// object when multiple ranges are requested
None
} else {
ranges.pop()
}
}
None => None,
};
if let Some(range) = range {
return handle_get_range( return handle_get_range(
garage, garage,
last_v, last_v,
@ -305,58 +291,145 @@ async fn handle_get_range(
} }
} }
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => { ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
let version = garage.version_table.get(&version.uuid, &EmptyKey).await?; let version = garage
let version = match version { .version_table
Some(v) => v, .get(&version.uuid, &EmptyKey)
None => return Err(Error::NoSuchKey), .await?
}; .ok_or(Error::NoSuchKey)?;
// We will store here the list of blocks that have an intersection with the requested let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
// range, as well as their "true offset", which is their actual offset in the complete
// file (whereas block.offset designates the offset of the block WITHIN THE PART
// block.part_number, which is not the same in the case of a multipart upload)
let mut blocks = Vec::with_capacity(std::cmp::min(
version.blocks.len(),
4 + ((end - begin) / std::cmp::max(version.blocks.items()[0].1.size as u64, 1024))
as usize,
));
let mut true_offset = 0;
for (_, b) in version.blocks.items().iter() {
if true_offset >= end {
break;
}
// Keep only blocks that have an intersection with the requested range
if true_offset < end && true_offset + b.size > begin {
blocks.push((*b, true_offset));
}
true_offset += b.size;
}
let body_stream = futures::stream::iter(blocks)
.map(move |(block, true_offset)| {
let garage = garage.clone();
async move {
let data = garage.block_manager.rpc_get_block(&block.hash).await?;
let data = Bytes::from(data);
let start_in_block = if true_offset > begin {
0
} else {
begin - true_offset
};
let end_in_block = if true_offset + block.size < end {
block.size
} else {
end - true_offset
};
Result::<Bytes, Error>::Ok(
data.slice(start_in_block as usize..end_in_block as usize),
)
}
})
.buffered(2);
let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?) Ok(resp_builder.body(body)?)
} }
} }
} }
async fn handle_get_part(
garage: Arc<Garage>,
req: &Request<Body>,
object_version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
part_number: u64,
) -> Result<Response<Body>, Error> {
let version = if let ObjectVersionData::FirstBlock(_, _) = version_data {
garage
.version_table
.get(&object_version.uuid, &EmptyKey)
.await?
.ok_or(Error::NoSuchKey)?
} else {
return Err(Error::BadRequest(
"Cannot handle part_number: not a multipart upload.".into(),
));
};
let blocks = version
.blocks
.items()
.iter()
.filter(|(k, _)| k.part_number == part_number)
.cloned()
.collect::<Vec<_>>();
if blocks.is_empty() {
return Err(Error::BadRequest(format!("No such part: {}", part_number)));
}
let part_size = blocks.iter().map(|(_, b)| b.size).sum();
if let Some(r) = parse_range_header(req, part_size)? {
let range_begin = r.start;
let range_end = r.start + r.length;
let body = body_from_blocks_range(garage, &blocks[..], range_begin, range_end);
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", range_end - range_begin))
.header(
CONTENT_RANGE,
format!("bytes {}-{}/{}", range_begin, range_end - 1, part_size),
)
.status(StatusCode::PARTIAL_CONTENT)
.body(body)?)
} else {
let body = body_from_blocks_range(garage, &blocks[..], 0, part_size);
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", part_size))
.status(StatusCode::OK)
.body(body)?)
}
}
fn parse_range_header(
req: &Request<Body>,
total_size: u64,
) -> Result<Option<http_range::HttpRange>, Error> {
let range = match req.headers().get(RANGE) {
Some(range) => {
let range_str = range.to_str()?;
let mut ranges =
http_range::HttpRange::parse(range_str, total_size).map_err(|e| (e, total_size))?;
if ranges.len() > 1 {
// garage does not support multi-range requests yet, so we respond with the entire
// object when multiple ranges are requested
None
} else {
ranges.pop()
}
}
None => None,
};
Ok(range)
}
fn body_from_blocks_range(
garage: Arc<Garage>,
all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64,
end: u64,
) -> Body {
// We will store here the list of blocks that have an intersection with the requested
// range, as well as their "true offset", which is their actual offset in the complete
// file (whereas block.offset designates the offset of the block WITHIN THE PART
// block.part_number, which is not the same in the case of a multipart upload)
let mut blocks: Vec<(VersionBlock, u64)> = Vec::with_capacity(std::cmp::min(
all_blocks.len(),
4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize,
));
let mut true_offset = 0;
for (_, b) in all_blocks.iter() {
if true_offset >= end {
break;
}
// Keep only blocks that have an intersection with the requested range
if true_offset < end && true_offset + b.size > begin {
blocks.push((*b, true_offset));
}
true_offset += b.size;
}
let body_stream = futures::stream::iter(blocks)
.map(move |(block, true_offset)| {
let garage = garage.clone();
async move {
let data = garage.block_manager.rpc_get_block(&block.hash).await?;
let data = Bytes::from(data);
let start_in_block = if true_offset > begin {
0
} else {
begin - true_offset
};
let end_in_block = if true_offset + block.size < end {
block.size
} else {
end - true_offset
};
Result::<Bytes, Error>::Ok(
data.slice(start_in_block as usize..end_in_block as usize),
)
}
})
.buffered(2);
hyper::body::Body::wrap_stream(body_stream)
}