Prepare for multipart uploads

This commit is contained in:
Alex 2020-04-26 18:55:13 +00:00
parent ea7e4748ed
commit 9cb870f950
7 changed files with 61 additions and 33 deletions

4
TODO
View file

@ -8,8 +8,10 @@ We will have to introduce lots of dummy data and then add/remove nodes many time
Attaining S3 compatibility Attaining S3 compatibility
-------------------------- --------------------------
- api_server following the S3 semantics for head/get/put/list/delete: verify more that it works as intended
- multipart uploads - multipart uploads
- fix sync not working in some cases ? (when starting from empty?)
- api_server following the S3 semantics for head/get/put/list/delete: verify more that it works as intended
- PUT requests: verify content-md5 if provided - PUT requests: verify content-md5 if provided
- possibly other necessary endpoints ? - possibly other necessary endpoints ?

View file

@ -134,10 +134,9 @@ async fn handler_inner(
.body(empty_body) .body(empty_body)
.unwrap(); .unwrap();
Ok(response) Ok(response)
}, }
&Method::DELETE => Err(Error::Forbidden( &Method::DELETE => Err(Error::Forbidden(
"Cannot delete buckets using S3 api, please talk to Garage directly" "Cannot delete buckets using S3 api, please talk to Garage directly".into(),
.into(),
)), )),
&Method::GET => { &Method::GET => {
let mut params = HashMap::new(); let mut params = HashMap::new();

View file

@ -43,7 +43,7 @@ pub async fn handle_head(
.versions() .versions()
.iter() .iter()
.rev() .rev()
.filter(|v| v.is_complete && v.data != ObjectVersionData::DeleteMarker) .filter(|v| v.is_data())
.next() .next()
{ {
Some(v) => v, Some(v) => v,
@ -76,7 +76,7 @@ pub async fn handle_get(
.versions() .versions()
.iter() .iter()
.rev() .rev()
.filter(|v| v.is_complete) .filter(|v| v.is_complete())
.next() .next()
{ {
Some(v) => v, Some(v) => v,

View file

@ -8,7 +8,6 @@ use hyper::Response;
use garage_util::error::Error; use garage_util::error::Error;
use garage_core::garage::Garage; use garage_core::garage::Garage;
use garage_core::object_table::*;
use crate::api_server::BodyType; use crate::api_server::BodyType;
use crate::http_util::*; use crate::http_util::*;
@ -44,11 +43,7 @@ pub async fn handle_list(
) )
.await?; .await?;
for object in objects.iter() { for object in objects.iter() {
if let Some(version) = object if let Some(version) = object.versions().iter().find(|x| x.is_data()) {
.versions()
.iter()
.find(|x| x.is_complete && x.data != ObjectVersionData::DeleteMarker)
{
if !object.key.starts_with(prefix) { if !object.key.starts_with(prefix) {
truncated = false; truncated = false;
break; break;
@ -56,7 +51,7 @@ pub async fn handle_list(
let common_prefix = if delimiter.len() > 0 { let common_prefix = if delimiter.len() > 0 {
let relative_key = &object.key[prefix.len()..]; let relative_key = &object.key[prefix.len()..];
match relative_key.find(delimiter) { match relative_key.find(delimiter) {
Some(i) => Some(&object.key[..prefix.len()+i+delimiter.len()]), Some(i) => Some(&object.key[..prefix.len() + i + delimiter.len()]),
None => None, None => None,
} }
} else { } else {
@ -70,7 +65,9 @@ pub async fn handle_list(
last_modified: version.timestamp, last_modified: version.timestamp,
size: version.size, size: version.size,
}, },
Some(_lri) => return Err(Error::Message(format!("Duplicate key?? {}", object.key))), Some(_lri) => {
return Err(Error::Message(format!("Duplicate key?? {}", object.key)))
}
}; };
result_keys.insert(object.key.clone(), info); result_keys.insert(object.key.clone(), info);
}; };
@ -117,7 +114,7 @@ pub async fn handle_list(
writeln!(&mut xml, "</ListBucketResult>").unwrap(); writeln!(&mut xml, "</ListBucketResult>").unwrap();
Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes())))) Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes()))))
} }
fn xml_escape(s: &str) -> String { fn xml_escape(s: &str) -> String {
s.replace("<", "&lt;") s.replace("<", "&lt;")
.replace(">", "&gt;") .replace(">", "&gt;")

View file

@ -33,13 +33,13 @@ pub async fn handle_put(
timestamp: now_msec(), timestamp: now_msec(),
mime_type: mime_type.to_string(), mime_type: mime_type.to_string(),
size: first_block.len() as u64, size: first_block.len() as u64,
is_complete: false, state: ObjectVersionState::Uploading,
data: ObjectVersionData::DeleteMarker, data: ObjectVersionData::DeleteMarker,
}; };
if first_block.len() < INLINE_THRESHOLD { if first_block.len() < INLINE_THRESHOLD {
object_version.data = ObjectVersionData::Inline(first_block); object_version.data = ObjectVersionData::Inline(first_block);
object_version.is_complete = true; object_version.state = ObjectVersionState::Complete;
let object = Object::new(bucket.into(), key.into(), vec![object_version]); let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
@ -54,7 +54,8 @@ pub async fn handle_put(
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
let mut next_offset = first_block.len(); let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta(garage.clone(), &version, 0, first_block_hash); let mut put_curr_version_block =
put_block_meta(garage.clone(), &version, 0, 0, first_block_hash);
let mut put_curr_block = garage let mut put_curr_block = garage
.block_manager .block_manager
.rpc_put_block(first_block_hash, first_block); .rpc_put_block(first_block_hash, first_block);
@ -66,7 +67,7 @@ pub async fn handle_put(
let block_hash = hash(&block[..]); let block_hash = hash(&block[..]);
let block_len = block.len(); let block_len = block.len();
put_curr_version_block = put_curr_version_block =
put_block_meta(garage.clone(), &version, next_offset as u64, block_hash); put_block_meta(garage.clone(), &version, 0, next_offset as u64, block_hash);
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
next_offset += block_len; next_offset += block_len;
} else { } else {
@ -76,7 +77,7 @@ pub async fn handle_put(
// TODO: if at any step we have an error, we should undo everything we did // TODO: if at any step we have an error, we should undo everything we did
object_version.is_complete = true; object_version.state = ObjectVersionState::Complete;
object_version.size = next_offset as u64; object_version.size = next_offset as u64;
let object = Object::new(bucket.into(), key.into(), vec![object_version]); let object = Object::new(bucket.into(), key.into(), vec![object_version]);
@ -88,12 +89,19 @@ pub async fn handle_put(
async fn put_block_meta( async fn put_block_meta(
garage: Arc<Garage>, garage: Arc<Garage>,
version: &Version, version: &Version,
part_number: u64,
offset: u64, offset: u64,
hash: Hash, hash: Hash,
) -> Result<(), Error> { ) -> Result<(), Error> {
// TODO: don't clone, restart from empty block list ?? // TODO: don't clone, restart from empty block list ??
let mut version = version.clone(); let mut version = version.clone();
version.add_block(VersionBlock { offset, hash }).unwrap(); version
.add_block(VersionBlock {
part_number,
offset,
hash,
})
.unwrap();
let block_ref = BlockRef { let block_ref = BlockRef {
block: hash, block: hash,
@ -180,7 +188,7 @@ pub async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Resu
timestamp: now_msec(), timestamp: now_msec(),
mime_type: "application/x-delete-marker".into(), mime_type: "application/x-delete-marker".into(),
size: 0, size: 0,
is_complete: true, state: ObjectVersionState::Complete,
data: ObjectVersionData::DeleteMarker, data: ObjectVersionData::DeleteMarker,
}], }],
); );

View file

@ -61,11 +61,31 @@ pub struct ObjectVersion {
pub mime_type: String, pub mime_type: String,
pub size: u64, pub size: u64,
pub is_complete: bool, pub state: ObjectVersionState,
pub data: ObjectVersionData, pub data: ObjectVersionData,
} }
#[derive(PartialEq, Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
Uploading,
Complete,
Aborted,
}
impl ObjectVersionState {
fn max(self, other: Self) -> Self {
use ObjectVersionState::*;
if self == Aborted || other == Aborted {
Aborted
} else if self == Complete || other == Complete {
Complete
} else {
Uploading
}
}
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData { pub enum ObjectVersionData {
DeleteMarker, DeleteMarker,
@ -74,8 +94,14 @@ pub enum ObjectVersionData {
} }
impl ObjectVersion { impl ObjectVersion {
fn cmp_key(&self) -> (u64, &UUID) { fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, &self.uuid) (self.timestamp, self.uuid)
}
pub fn is_complete(&self) -> bool {
self.state == ObjectVersionState::Complete
}
pub fn is_data(&self) -> bool {
self.state == ObjectVersionState::Complete && self.data != ObjectVersionData::DeleteMarker
} }
} }
@ -98,9 +124,7 @@ impl Entry<String, String> for Object {
if other_v.size > v.size { if other_v.size > v.size {
v.size = other_v.size; v.size = other_v.size;
} }
if other_v.is_complete && !v.is_complete { v.state = v.state.max(other_v.state);
v.is_complete = true;
}
} }
Err(i) => { Err(i) => {
self.versions.insert(i, other_v.clone()); self.versions.insert(i, other_v.clone());
@ -112,7 +136,7 @@ impl Entry<String, String> for Object {
.iter() .iter()
.enumerate() .enumerate()
.rev() .rev()
.filter(|(_, v)| v.is_complete) .filter(|(_, v)| v.is_complete())
.next() .next()
.map(|(vi, _)| vi); .map(|(vi, _)| vi);
@ -159,9 +183,6 @@ impl TableSchema for ObjectTable {
} }
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
entry entry.versions.iter().any(|v| v.is_data())
.versions
.iter()
.any(|x| x.is_complete && x.data != ObjectVersionData::DeleteMarker)
} }
} }

View file

@ -64,6 +64,7 @@ impl Version {
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct VersionBlock { pub struct VersionBlock {
pub part_number: u64,
pub offset: u64, pub offset: u64,
pub hash: Hash, pub hash: Hash,
} }