diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 97cc6b9..f7ca156 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -1,30 +1,25 @@ -use std::collections::VecDeque; +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; -use std::time::{Duration, UNIX_EPOCH}; use futures::future::Future; -use futures::stream::*; use hyper::body::{Bytes, HttpBody}; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use hyper::{Body, Method, Request, Response, Server}; -use garage_util::data::*; use garage_util::error::Error; -use garage_table::EmptyKey; - -use garage_core::block::INLINE_THRESHOLD; -use garage_core::block_ref_table::*; use garage_core::garage::Garage; -use garage_core::object_table::*; -use garage_core::version_table::*; use crate::http_util::*; use crate::signature::check_signature; -type BodyType = Box + Send + Unpin>; +use crate::s3_get::{handle_get, handle_head}; +use crate::s3_list::handle_list; +use crate::s3_put::{handle_delete, handle_put}; + +pub type BodyType = Box + Send + Unpin>; pub async fn run_api_server( garage: Arc, @@ -128,298 +123,41 @@ async fn handler_inner( _ => Err(Error::BadRequest(format!("Invalid method"))), } } else { - // TODO: listing buckets - Err(Error::Forbidden("Unimplemented".into())) - } -} - -async fn handle_put( - garage: Arc, - mime_type: &str, - bucket: &str, - key: &str, - body: Body, -) -> Result { - let version_uuid = gen_uuid(); - - let mut chunker = BodyChunker::new(body, garage.config.block_size); - let first_block = match chunker.next().await? { - Some(x) => x, - None => return Err(Error::BadRequest(format!("Empty body"))), - }; - - let mut object_version = ObjectVersion { - uuid: version_uuid, - timestamp: now_msec(), - mime_type: mime_type.to_string(), - size: first_block.len() as u64, - is_complete: false, - data: ObjectVersionData::DeleteMarker, - }; - - if first_block.len() < INLINE_THRESHOLD { - object_version.data = ObjectVersionData::Inline(first_block); - object_version.is_complete = true; - - let object = Object::new(bucket.into(), key.into(), vec![object_version]); - garage.object_table.insert(&object).await?; - return Ok(version_uuid); - } - - let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); - - let first_block_hash = hash(&first_block[..]); - object_version.data = ObjectVersionData::FirstBlock(first_block_hash); - let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); - garage.object_table.insert(&object).await?; - - let mut next_offset = first_block.len(); - let mut put_curr_version_block = put_block_meta(garage.clone(), &version, 0, first_block_hash); - let mut put_curr_block = garage - .block_manager - .rpc_put_block(first_block_hash, first_block); - - loop { - let (_, _, next_block) = - futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; - if let Some(block) = next_block { - let block_hash = hash(&block[..]); - let block_len = block.len(); - put_curr_version_block = - put_block_meta(garage.clone(), &version, next_offset as u64, block_hash); - put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); - next_offset += block_len; - } else { - break; - } - } - - // TODO: if at any step we have an error, we should undo everything we did - - object_version.is_complete = true; - object_version.size = next_offset as u64; - - let object = Object::new(bucket.into(), key.into(), vec![object_version]); - garage.object_table.insert(&object).await?; - - Ok(version_uuid) -} - -async fn put_block_meta( - garage: Arc, - version: &Version, - offset: u64, - hash: Hash, -) -> Result<(), Error> { - // TODO: don't clone, restart from empty block list ?? - let mut version = version.clone(); - version.add_block(VersionBlock { offset, hash }).unwrap(); - - let block_ref = BlockRef { - block: hash, - version: version.uuid, - deleted: false, - }; - - futures::try_join!( - garage.version_table.insert(&version), - garage.block_ref_table.insert(&block_ref), - )?; - Ok(()) -} - -struct BodyChunker { - body: Body, - read_all: bool, - block_size: usize, - buf: VecDeque, -} - -impl BodyChunker { - fn new(body: Body, block_size: usize) -> Self { - Self { - body, - read_all: false, - block_size, - buf: VecDeque::new(), - } - } - async fn next(&mut self) -> Result>, Error> { - while !self.read_all && self.buf.len() < self.block_size { - if let Some(block) = self.body.next().await { - let bytes = block?; - trace!("Body next: {} bytes", bytes.len()); - self.buf.extend(&bytes[..]); - } else { - self.read_all = true; - } - } - if self.buf.len() == 0 { - Ok(None) - } else if self.buf.len() <= self.block_size { - let block = self.buf.drain(..).collect::>(); - Ok(Some(block)) - } else { - let block = self.buf.drain(..self.block_size).collect::>(); - Ok(Some(block)) - } - } -} - -async fn handle_delete(garage: Arc, bucket: &str, key: &str) -> Result { - let exists = match garage - .object_table - .get(&bucket.to_string(), &key.to_string()) - .await? - { - None => false, - Some(o) => { - let mut has_active_version = false; - for v in o.versions().iter() { - if v.data != ObjectVersionData::DeleteMarker { - has_active_version = true; - break; + match req.method() { + &Method::PUT | &Method::DELETE => Err(Error::Forbidden( + "Cannot create or delete buckets using S3 api, please talk to Garage directly" + .into(), + )), + &Method::GET => { + let mut params = HashMap::new(); + if let Some(query) = req.uri().query() { + let query_pairs = url::form_urlencoded::parse(query.as_bytes()); + for (key, val) in query_pairs { + params.insert(key.to_lowercase(), val.to_string()); + } + } + if ["delimiter", "prefix"] + .iter() + .all(|x| params.contains_key(&x.to_string())) + { + let delimiter = params.get("delimiter").unwrap(); + let max_keys = params + .get("max-keys") + .map(|x| { + x.parse::().map_err(|e| { + Error::BadRequest(format!("Invalid value for max-keys: {}", e)) + }) + }) + .unwrap_or(Ok(1000))?; + let prefix = params.get("prefix").unwrap(); + Ok(handle_list(garage, bucket, delimiter, max_keys, prefix).await?) + } else { + Err(Error::BadRequest(format!( + "Not a list call, so what is it?" + ))) } } - has_active_version - } - }; - - if !exists { - // No need to delete - return Ok([0u8; 32].into()); - } - - let version_uuid = gen_uuid(); - - let object = Object::new( - bucket.into(), - key.into(), - vec![ObjectVersion { - uuid: version_uuid, - timestamp: now_msec(), - mime_type: "application/x-delete-marker".into(), - size: 0, - is_complete: true, - data: ObjectVersionData::DeleteMarker, - }], - ); - - garage.object_table.insert(&object).await?; - return Ok(version_uuid); -} - -fn object_headers(version: &ObjectVersion) -> http::response::Builder { - let date = UNIX_EPOCH + Duration::from_millis(version.timestamp); - let date_str = httpdate::fmt_http_date(date); - - Response::builder() - .header("Content-Type", version.mime_type.to_string()) - .header("Content-Length", format!("{}", version.size)) - .header("Last-Modified", date_str) -} - -async fn handle_head( - garage: Arc, - bucket: &str, - key: &str, -) -> Result, Error> { - let object = match garage - .object_table - .get(&bucket.to_string(), &key.to_string()) - .await? - { - None => return Err(Error::NotFound), - Some(o) => o, - }; - - let version = match object - .versions() - .iter() - .rev() - .filter(|v| v.is_complete && v.data != ObjectVersionData::DeleteMarker) - .next() - { - Some(v) => v, - None => return Err(Error::NotFound), - }; - - let body: BodyType = Box::new(BytesBody::from(vec![])); - let response = object_headers(&version) - .status(StatusCode::OK) - .body(body) - .unwrap(); - Ok(response) -} - -async fn handle_get( - garage: Arc, - bucket: &str, - key: &str, -) -> Result, Error> { - let object = match garage - .object_table - .get(&bucket.to_string(), &key.to_string()) - .await? - { - None => return Err(Error::NotFound), - Some(o) => o, - }; - - let last_v = match object - .versions() - .iter() - .rev() - .filter(|v| v.is_complete) - .next() - { - Some(v) => v, - None => return Err(Error::NotFound), - }; - - let resp_builder = object_headers(&last_v).status(StatusCode::OK); - - match &last_v.data { - ObjectVersionData::DeleteMarker => Err(Error::NotFound), - ObjectVersionData::Inline(bytes) => { - let body: BodyType = Box::new(BytesBody::from(bytes.to_vec())); - Ok(resp_builder.body(body)?) - } - ObjectVersionData::FirstBlock(first_block_hash) => { - let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash); - let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); - - let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; - let version = match version { - Some(v) => v, - None => return Err(Error::NotFound), - }; - - let mut blocks = version - .blocks() - .iter() - .map(|vb| (vb.hash, None)) - .collect::>(); - blocks[0].1 = Some(first_block); - - let body_stream = futures::stream::iter(blocks) - .map(move |(hash, data_opt)| { - let garage = garage.clone(); - async move { - if let Some(data) = data_opt { - Ok(Bytes::from(data)) - } else { - garage - .block_manager - .rpc_get_block(&hash) - .await - .map(Bytes::from) - } - } - }) - .buffered(2); - let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream))); - Ok(resp_builder.body(body)?) + _ => Err(Error::BadRequest(format!("Invalid method"))), } } } diff --git a/src/api/lib.rs b/src/api/lib.rs index 3bcd02f..da53722 100644 --- a/src/api/lib.rs +++ b/src/api/lib.rs @@ -4,3 +4,7 @@ extern crate log; pub mod api_server; pub mod http_util; pub mod signature; + +pub mod s3_get; +pub mod s3_list; +pub mod s3_put; diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs new file mode 100644 index 0000000..b933f54 --- /dev/null +++ b/src/api/s3_get.rs @@ -0,0 +1,131 @@ +use std::sync::Arc; +use std::time::{Duration, UNIX_EPOCH}; + +use futures::stream::*; +use hyper::body::Bytes; +use hyper::{Response, StatusCode}; + +use garage_util::error::Error; + +use garage_table::EmptyKey; + +use garage_core::garage::Garage; +use garage_core::object_table::*; + +use crate::api_server::BodyType; +use crate::http_util::*; + +fn object_headers(version: &ObjectVersion) -> http::response::Builder { + let date = UNIX_EPOCH + Duration::from_millis(version.timestamp); + let date_str = httpdate::fmt_http_date(date); + + Response::builder() + .header("Content-Type", version.mime_type.to_string()) + .header("Content-Length", format!("{}", version.size)) + .header("Last-Modified", date_str) +} + +pub async fn handle_head( + garage: Arc, + bucket: &str, + key: &str, +) -> Result, Error> { + let object = match garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await? + { + None => return Err(Error::NotFound), + Some(o) => o, + }; + + let version = match object + .versions() + .iter() + .rev() + .filter(|v| v.is_complete && v.data != ObjectVersionData::DeleteMarker) + .next() + { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let body: BodyType = Box::new(BytesBody::from(vec![])); + let response = object_headers(&version) + .status(StatusCode::OK) + .body(body) + .unwrap(); + Ok(response) +} + +pub async fn handle_get( + garage: Arc, + bucket: &str, + key: &str, +) -> Result, Error> { + let object = match garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await? + { + None => return Err(Error::NotFound), + Some(o) => o, + }; + + let last_v = match object + .versions() + .iter() + .rev() + .filter(|v| v.is_complete) + .next() + { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let resp_builder = object_headers(&last_v).status(StatusCode::OK); + + match &last_v.data { + ObjectVersionData::DeleteMarker => Err(Error::NotFound), + ObjectVersionData::Inline(bytes) => { + let body: BodyType = Box::new(BytesBody::from(bytes.to_vec())); + Ok(resp_builder.body(body)?) + } + ObjectVersionData::FirstBlock(first_block_hash) => { + let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash); + let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); + + let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; + let version = match version { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let mut blocks = version + .blocks() + .iter() + .map(|vb| (vb.hash, None)) + .collect::>(); + blocks[0].1 = Some(first_block); + + let body_stream = futures::stream::iter(blocks) + .map(move |(hash, data_opt)| { + let garage = garage.clone(); + async move { + if let Some(data) = data_opt { + Ok(Bytes::from(data)) + } else { + garage + .block_manager + .rpc_get_block(&hash) + .await + .map(Bytes::from) + } + } + }) + .buffered(2); + let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream))); + Ok(resp_builder.body(body)?) + } + } +} diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs new file mode 100644 index 0000000..6004bff --- /dev/null +++ b/src/api/s3_list.rs @@ -0,0 +1,112 @@ +use std::collections::BTreeMap; +use std::fmt::Write; +use std::sync::Arc; + +use chrono::{DateTime, NaiveDateTime, Utc, SecondsFormat}; +use hyper::Response; + +use garage_util::error::Error; + +use garage_core::garage::Garage; +use garage_core::object_table::*; + +use crate::api_server::BodyType; +use crate::http_util::*; + +#[derive(Debug)] +struct ListResultInfo { + last_modified: u64, + size: u64, +} + +pub async fn handle_list( + garage: Arc, + bucket: &str, + delimiter: &str, + max_keys: usize, + prefix: &str, +) -> Result, Error> { + let mut result = BTreeMap::::new(); + let mut truncated = true; + let mut next_chunk_start = prefix.to_string(); + + println!("List request: `{}` {} `{}`", delimiter, max_keys, prefix); + + while result.len() < max_keys && truncated { + let objects = garage + .object_table + .get_range( + &bucket.to_string(), + Some(next_chunk_start.clone()), + Some(()), + max_keys, + ) + .await?; + for object in objects.iter() { + if let Some(version) = object + .versions() + .iter() + .find(|x| x.is_complete && x.data != ObjectVersionData::DeleteMarker) + { + let relative_key = match object.key.starts_with(prefix) { + true => &object.key[prefix.len()..], + false => { + truncated = false; + break; + } + }; + let delimited_key = match relative_key.find(delimiter) { + Some(i) => relative_key.split_at(i).1, + None => &relative_key, + }; + let delimited_key = delimited_key.to_string(); + let new_info = match result.get(&delimited_key) { + None => ListResultInfo { + last_modified: version.timestamp, + size: version.size, + }, + Some(lri) => ListResultInfo { + last_modified: std::cmp::max(version.timestamp, lri.last_modified), + size: 0, + }, + }; + println!("Entry: {} {:?}", delimited_key, new_info); + result.insert(delimited_key, new_info); + } + } + if objects.len() < max_keys { + truncated = false; + } + if objects.len() > 0 { + next_chunk_start = objects[objects.len() - 1].key.clone(); + } + } + + let mut xml = String::new(); + writeln!(&mut xml, r#""#).unwrap(); + writeln!( + &mut xml, + r#""# + ) + .unwrap(); + writeln!(&mut xml, "\t{}", bucket).unwrap(); + writeln!(&mut xml, "\t{}", prefix).unwrap(); + writeln!(&mut xml, "\t{}", result.len()).unwrap(); + writeln!(&mut xml, "\t{}", max_keys).unwrap(); + writeln!(&mut xml, "\t{}", truncated).unwrap(); + for (key, info) in result.iter() { + let last_modif = NaiveDateTime::from_timestamp(info.last_modified as i64 / 1000, 0); + let last_modif = DateTime::::from_utc(last_modif, Utc); + let last_modif = last_modif.to_rfc3339_opts(SecondsFormat::Millis, true); + writeln!(&mut xml, "\t").unwrap(); + writeln!(&mut xml, "\t\t{}", key).unwrap(); + writeln!(&mut xml, "\t\t{}", last_modif).unwrap(); + writeln!(&mut xml, "\t\t{}", info.size).unwrap(); + writeln!(&mut xml, "\t\tSTANDARD").unwrap(); + writeln!(&mut xml, "\t").unwrap(); + } + writeln!(&mut xml, "").unwrap(); + println!("{}", xml); + + Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes())))) +} diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs new file mode 100644 index 0000000..15c2ccb --- /dev/null +++ b/src/api/s3_put.rs @@ -0,0 +1,190 @@ +use std::collections::VecDeque; +use std::sync::Arc; + +use futures::stream::*; +use hyper::Body; + +use garage_util::data::*; +use garage_util::error::Error; + +use garage_core::block::INLINE_THRESHOLD; +use garage_core::block_ref_table::*; +use garage_core::garage::Garage; +use garage_core::object_table::*; +use garage_core::version_table::*; + +pub async fn handle_put( + garage: Arc, + mime_type: &str, + bucket: &str, + key: &str, + body: Body, +) -> Result { + let version_uuid = gen_uuid(); + + let mut chunker = BodyChunker::new(body, garage.config.block_size); + let first_block = match chunker.next().await? { + Some(x) => x, + None => return Err(Error::BadRequest(format!("Empty body"))), + }; + + let mut object_version = ObjectVersion { + uuid: version_uuid, + timestamp: now_msec(), + mime_type: mime_type.to_string(), + size: first_block.len() as u64, + is_complete: false, + data: ObjectVersionData::DeleteMarker, + }; + + if first_block.len() < INLINE_THRESHOLD { + object_version.data = ObjectVersionData::Inline(first_block); + object_version.is_complete = true; + + let object = Object::new(bucket.into(), key.into(), vec![object_version]); + garage.object_table.insert(&object).await?; + return Ok(version_uuid); + } + + let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); + + let first_block_hash = hash(&first_block[..]); + object_version.data = ObjectVersionData::FirstBlock(first_block_hash); + let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); + garage.object_table.insert(&object).await?; + + let mut next_offset = first_block.len(); + let mut put_curr_version_block = put_block_meta(garage.clone(), &version, 0, first_block_hash); + let mut put_curr_block = garage + .block_manager + .rpc_put_block(first_block_hash, first_block); + + loop { + let (_, _, next_block) = + futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + if let Some(block) = next_block { + let block_hash = hash(&block[..]); + let block_len = block.len(); + put_curr_version_block = + put_block_meta(garage.clone(), &version, next_offset as u64, block_hash); + put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); + next_offset += block_len; + } else { + break; + } + } + + // TODO: if at any step we have an error, we should undo everything we did + + object_version.is_complete = true; + object_version.size = next_offset as u64; + + let object = Object::new(bucket.into(), key.into(), vec![object_version]); + garage.object_table.insert(&object).await?; + + Ok(version_uuid) +} + +async fn put_block_meta( + garage: Arc, + version: &Version, + offset: u64, + hash: Hash, +) -> Result<(), Error> { + // TODO: don't clone, restart from empty block list ?? + let mut version = version.clone(); + version.add_block(VersionBlock { offset, hash }).unwrap(); + + let block_ref = BlockRef { + block: hash, + version: version.uuid, + deleted: false, + }; + + futures::try_join!( + garage.version_table.insert(&version), + garage.block_ref_table.insert(&block_ref), + )?; + Ok(()) +} + +struct BodyChunker { + body: Body, + read_all: bool, + block_size: usize, + buf: VecDeque, +} + +impl BodyChunker { + fn new(body: Body, block_size: usize) -> Self { + Self { + body, + read_all: false, + block_size, + buf: VecDeque::new(), + } + } + async fn next(&mut self) -> Result>, Error> { + while !self.read_all && self.buf.len() < self.block_size { + if let Some(block) = self.body.next().await { + let bytes = block?; + trace!("Body next: {} bytes", bytes.len()); + self.buf.extend(&bytes[..]); + } else { + self.read_all = true; + } + } + if self.buf.len() == 0 { + Ok(None) + } else if self.buf.len() <= self.block_size { + let block = self.buf.drain(..).collect::>(); + Ok(Some(block)) + } else { + let block = self.buf.drain(..self.block_size).collect::>(); + Ok(Some(block)) + } + } +} + +pub async fn handle_delete(garage: Arc, bucket: &str, key: &str) -> Result { + let exists = match garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await? + { + None => false, + Some(o) => { + let mut has_active_version = false; + for v in o.versions().iter() { + if v.data != ObjectVersionData::DeleteMarker { + has_active_version = true; + break; + } + } + has_active_version + } + }; + + if !exists { + // No need to delete + return Ok([0u8; 32].into()); + } + + let version_uuid = gen_uuid(); + + let object = Object::new( + bucket.into(), + key.into(), + vec![ObjectVersion { + uuid: version_uuid, + timestamp: now_msec(), + mime_type: "application/x-delete-marker".into(), + size: 0, + is_complete: true, + data: ObjectVersionData::DeleteMarker, + }], + ); + + garage.object_table.insert(&object).await?; + return Ok(version_uuid); +} diff --git a/src/core/object_table.rs b/src/core/object_table.rs index 1fe2b3d..e4c7852 100644 --- a/src/core/object_table.rs +++ b/src/core/object_table.rs @@ -158,8 +158,10 @@ impl TableSchema for ObjectTable { Ok(()) } - fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { - // TODO - true + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + entry + .versions + .iter() + .any(|x| x.is_complete && x.data != ObjectVersionData::DeleteMarker) } }