forked from Deuxfleurs/garage
Merge pull request 'Content-range fix' (#24) from bug/content-range into master
Reviewed-on: Deuxfleurs/garage#24
This commit is contained in:
commit
dfbc280c37
1 changed files with 34 additions and 17 deletions
|
@ -24,7 +24,6 @@ fn object_headers(
|
||||||
"Content-Type",
|
"Content-Type",
|
||||||
version_meta.headers.content_type.to_string(),
|
version_meta.headers.content_type.to_string(),
|
||||||
)
|
)
|
||||||
.header("Content-Length", format!("{}", version_meta.size))
|
|
||||||
.header("ETag", version_meta.etag.to_string())
|
.header("ETag", version_meta.etag.to_string())
|
||||||
.header("Last-Modified", date_str)
|
.header("Last-Modified", date_str)
|
||||||
.header("Accept-Ranges", format!("bytes"));
|
.header("Accept-Ranges", format!("bytes"));
|
||||||
|
@ -63,6 +62,7 @@ pub async fn handle_head(
|
||||||
|
|
||||||
let body: Body = Body::from(vec![]);
|
let body: Body = Body::from(vec![]);
|
||||||
let response = object_headers(&version, version_meta)
|
let response = object_headers(&version, version_meta)
|
||||||
|
.header("Content-Length", format!("{}", version_meta.size))
|
||||||
.status(StatusCode::OK)
|
.status(StatusCode::OK)
|
||||||
.body(body)
|
.body(body)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -123,7 +123,9 @@ pub async fn handle_get(
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let resp_builder = object_headers(&last_v, last_v_meta).status(StatusCode::OK);
|
let resp_builder = object_headers(&last_v, last_v_meta)
|
||||||
|
.header("Content-Length", format!("{}", last_v_meta.size))
|
||||||
|
.status(StatusCode::OK);
|
||||||
|
|
||||||
match &last_v_data {
|
match &last_v_data {
|
||||||
ObjectVersionData::DeleteMarker => unreachable!(),
|
ObjectVersionData::DeleteMarker => unreachable!(),
|
||||||
|
@ -161,7 +163,7 @@ pub async fn handle_get(
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.buffered(2);
|
.buffered(2);
|
||||||
//let body: Body = Box::new(StreamBody::new(Box::pin(body_stream)));
|
|
||||||
let body = hyper::body::Body::wrap_stream(body_stream);
|
let body = hyper::body::Body::wrap_stream(body_stream);
|
||||||
Ok(resp_builder.body(body)?)
|
Ok(resp_builder.body(body)?)
|
||||||
}
|
}
|
||||||
|
@ -181,9 +183,10 @@ pub async fn handle_get_range(
|
||||||
}
|
}
|
||||||
|
|
||||||
let resp_builder = object_headers(version, version_meta)
|
let resp_builder = object_headers(version, version_meta)
|
||||||
|
.header("Content-Length", format!("{}", end - begin))
|
||||||
.header(
|
.header(
|
||||||
"Content-Range",
|
"Content-Range",
|
||||||
format!("bytes {}-{}/{}", begin, end, version_meta.size),
|
format!("bytes {}-{}/{}", begin, end - 1, version_meta.size),
|
||||||
)
|
)
|
||||||
.status(StatusCode::PARTIAL_CONTENT);
|
.status(StatusCode::PARTIAL_CONTENT);
|
||||||
|
|
||||||
|
@ -206,35 +209,49 @@ pub async fn handle_get_range(
|
||||||
None => return Err(Error::NotFound),
|
None => return Err(Error::NotFound),
|
||||||
};
|
};
|
||||||
|
|
||||||
let blocks = version
|
// We will store here the list of blocks that have an intersection with the requested
|
||||||
.blocks()
|
// range, as well as their "true offset", which is their actual offset in the complete
|
||||||
.iter()
|
// file (whereas block.offset designates the offset of the block WITHIN THE PART
|
||||||
.cloned()
|
// block.part_number, which is not the same in the case of a multipart upload)
|
||||||
.filter(|block| block.offset + block.size > begin && block.offset < end)
|
let mut blocks = Vec::with_capacity(std::cmp::min(
|
||||||
.collect::<Vec<_>>();
|
version.blocks().len(),
|
||||||
|
4 + ((end - begin) / std::cmp::max(version.blocks()[0].size as u64, 1024)) as usize,
|
||||||
|
));
|
||||||
|
let mut true_offset = 0;
|
||||||
|
for b in version.blocks().iter() {
|
||||||
|
if true_offset >= end {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Keep only blocks that have an intersection with the requested range
|
||||||
|
if true_offset < end && true_offset + b.size > begin {
|
||||||
|
blocks.push((b.clone(), true_offset));
|
||||||
|
}
|
||||||
|
true_offset += b.size;
|
||||||
|
}
|
||||||
|
|
||||||
let body_stream = futures::stream::iter(blocks)
|
let body_stream = futures::stream::iter(blocks)
|
||||||
.map(move |block| {
|
.map(move |(block, true_offset)| {
|
||||||
let garage = garage.clone();
|
let garage = garage.clone();
|
||||||
async move {
|
async move {
|
||||||
let data = garage.block_manager.rpc_get_block(&block.hash).await?;
|
let data = garage.block_manager.rpc_get_block(&block.hash).await?;
|
||||||
let start_in_block = if block.offset > begin {
|
let data = Bytes::from(data);
|
||||||
|
let start_in_block = if true_offset > begin {
|
||||||
0
|
0
|
||||||
} else {
|
} else {
|
||||||
begin - block.offset
|
begin - true_offset
|
||||||
};
|
};
|
||||||
let end_in_block = if block.offset + block.size < end {
|
let end_in_block = if true_offset + block.size < end {
|
||||||
block.size
|
block.size
|
||||||
} else {
|
} else {
|
||||||
end - block.offset
|
end - true_offset
|
||||||
};
|
};
|
||||||
Result::<Bytes, Error>::Ok(Bytes::from(
|
Result::<Bytes, Error>::Ok(Bytes::from(
|
||||||
data[start_in_block as usize..end_in_block as usize].to_vec(),
|
data.slice(start_in_block as usize..end_in_block as usize),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.buffered(2);
|
.buffered(2);
|
||||||
//let body: Body = Box::new(StreamBody::new(Box::pin(body_stream)));
|
|
||||||
let body = hyper::body::Body::wrap_stream(body_stream);
|
let body = hyper::body::Body::wrap_stream(body_stream);
|
||||||
Ok(resp_builder.body(body)?)
|
Ok(resp_builder.body(body)?)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue