diff --git a/src/api_server.rs b/src/api_server.rs index ff7c536c..a92fd36b 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -39,16 +39,10 @@ pub async fn run_api_server(garage: Arc, shutdown_signal: impl Future, req: Request, addr: SocketAddr) -> Result, Error> { match handler_inner(garage, req, addr).await { Ok(x) => Ok(x), - Err(Error::BadRequest(e)) => { - let mut bad_request = Response::new(Body::from(format!("{}\n", e))); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - Ok(bad_request) - } Err(e) => { - let mut ise = Response::new(Body::from( - format!("Internal server error: {}\n", e))); - *ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - Ok(ise) + let mut http_error = Response::new(Body::from(format!("{}\n", e))); + *http_error.status_mut() = e.http_status_code(); + Ok(http_error) } } } @@ -65,9 +59,7 @@ async fn handler_inner(garage: Arc, req: Request, addr: SocketAddr match req.method() { &Method::GET => { - Ok(Response::new(Body::from( - "TODO: implement GET object", - ))) + Ok(handle_get(garage, &bucket, &key).await?) } &Method::PUT => { let mime_type = req.headers() @@ -97,27 +89,30 @@ async fn handle_put(garage: Arc, None => return Err(Error::BadRequest(format!("Empty body"))), }; - let mut version = VersionMeta{ + let mut object = Object { bucket: bucket.into(), key: key.into(), - timestamp: now_msec(), + versions: Vec::new(), + }; + object.versions.push(Box::new(Version{ uuid: version_uuid.clone(), + timestamp: now_msec(), mime_type: mime_type.to_string(), size: first_block.len() as u64, is_complete: false, data: VersionData::DeleteMarker, - }; + })); if first_block.len() < INLINE_THRESHOLD { - version.data = VersionData::Inline(first_block); - version.is_complete = true; - garage.version_table.insert(&version).await?; + object.versions[0].data = VersionData::Inline(first_block); + object.versions[0].is_complete = true; + garage.object_table.insert(&object).await?; return Ok(version_uuid) } let first_block_hash = hash(&first_block[..]); - version.data = VersionData::FirstBlock(first_block_hash); - garage.version_table.insert(&version).await?; + object.versions[0].data = VersionData::FirstBlock(first_block_hash); + garage.object_table.insert(&object).await?; let block_meta = BlockMeta{ version_uuid: version_uuid.clone(), @@ -143,8 +138,9 @@ async fn handle_put(garage: Arc, // TODO: if at any step we have an error, we should undo everything we did - version.is_complete = true; - garage.version_table.insert(&version).await?; + object.versions[0].is_complete = true; + object.versions[0].size = next_offset as u64; + garage.object_table.insert(&object).await?; Ok(version_uuid) } @@ -198,3 +194,32 @@ impl BodyChunker { } } } + +async fn handle_get(garage: Arc, bucket: &str, key: &str) -> Result, Error> { + let mut object = match garage.object_table.get(&bucket.to_string(), &key.to_string()).await? { + None => return Err(Error::NotFound), + Some(o) => o + }; + + let last_v = match object.versions.drain(..) + .rev().filter(|v| v.is_complete) + .next() { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let resp_builder = Response::builder() + .header("Content-Type", last_v.mime_type) + .status(StatusCode::OK); + + match last_v.data { + VersionData::DeleteMarker => Err(Error::NotFound), + VersionData::Inline(bytes) => { + Ok(resp_builder.body(bytes.into())?) + } + VersionData::FirstBlock(hash) => { + // TODO + unimplemented!() + } + } +} diff --git a/src/data.rs b/src/data.rs index f01d5394..14846fe2 100644 --- a/src/data.rs +++ b/src/data.rs @@ -121,7 +121,7 @@ pub struct SplitpointMeta { pub deleted: bool, } -pub use crate::version_table::*; +pub use crate::object_table::*; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BlockMeta { diff --git a/src/error.rs b/src/error.rs index 1481234f..578f73e9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,6 @@ -use err_derive::Error; use std::io; +use err_derive::Error; +use hyper::StatusCode; #[derive(Debug, Error)] pub enum Error { @@ -32,9 +33,22 @@ pub enum Error { #[error(display = "RPC error: {}", _0)] RPCError(String), - #[error(display = "{}", _0)] + #[error(display = "Bad request: {}", _0)] BadRequest(String), + #[error(display = "Not found")] + NotFound, + #[error(display = "{}", _0)] Message(String), } + +impl Error { + pub fn http_status_code(&self) -> StatusCode { + match self { + Error::BadRequest(_) => StatusCode::BAD_REQUEST, + Error::NotFound => StatusCode::NOT_FOUND, + _ => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} diff --git a/src/main.rs b/src/main.rs index aa0f23dc..2303e7a9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ mod proto; mod membership; mod table; -mod version_table; +mod object_table; mod server; mod rpc_server; diff --git a/src/object_table.rs b/src/object_table.rs new file mode 100644 index 00000000..37c02225 --- /dev/null +++ b/src/object_table.rs @@ -0,0 +1,88 @@ +use std::sync::Arc; +use serde::{Serialize, Deserialize}; +use async_trait::async_trait; +use tokio::sync::RwLock; + +use crate::data::*; +use crate::table::*; +use crate::server::Garage; + + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Object { + pub bucket: String, + pub key: String, + + pub versions: Vec>, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Version { + pub uuid: UUID, + pub timestamp: u64, + + pub mime_type: String, + pub size: u64, + pub is_complete: bool, + + pub data: VersionData, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum VersionData { + DeleteMarker, + Inline(#[serde(with="serde_bytes")] Vec), + FirstBlock(Hash), +} + +pub struct ObjectTable { + pub garage: RwLock>>, +} + +impl Entry for Object { + fn partition_key(&self) -> &String { + &self.bucket + } + fn sort_key(&self) -> &String { + &self.key + } + + fn merge(&mut self, other: &Self) { + for other_v in other.versions.iter() { + match self.versions.binary_search_by(|v| (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid))) { + Ok(i) => { + let mut v = &mut self.versions[i]; + if other_v.size > v.size { + v.size = other_v.size; + } + if other_v.is_complete { + v.is_complete = true; + } + } + Err(i) => { + self.versions.insert(i, other_v.clone()); + } + } + } + let last_complete = self.versions + .iter().enumerate().rev() + .filter(|(_, v)| v.is_complete) + .next() + .map(|(vi, _)| vi); + + if let Some(last_vi) = last_complete { + self.versions = self.versions.drain(last_vi..).collect::>(); + } + } +} + +#[async_trait] +impl TableFormat for ObjectTable { + type P = String; + type S = String; + type E = Object; + + async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { + unimplemented!() + } +} diff --git a/src/server.rs b/src/server.rs index 1f1ac2af..c41ee9b7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -21,7 +21,7 @@ pub struct Garage { pub table_rpc_handlers: HashMap>, - pub version_table: Arc>, + pub object_table: Arc>, } impl Garage { @@ -35,25 +35,25 @@ impl Garage { timeout: DEFAULT_TIMEOUT, }; - let version_table = Arc::new(Table::new( - VersionTable{garage: RwLock::new(None)}, + let object_table = Arc::new(Table::new( + ObjectTable{garage: RwLock::new(None)}, system.clone(), &db, - "version".to_string(), + "object".to_string(), meta_rep_param.clone())); let mut garage = Self{ db, system: system.clone(), table_rpc_handlers: HashMap::new(), - version_table, + object_table, }; garage.table_rpc_handlers.insert( - garage.version_table.name.clone(), - garage.version_table.clone().rpc_handler()); + garage.object_table.name.clone(), + garage.object_table.clone().rpc_handler()); let garage = Arc::new(garage); - *garage.version_table.instance.garage.write().await = Some(garage.clone()); + *garage.object_table.instance.garage.write().await = Some(garage.clone()); garage } } diff --git a/src/table.rs b/src/table.rs index df82e9c7..55ae9229 100644 --- a/src/table.rs +++ b/src/table.rs @@ -64,11 +64,11 @@ pub struct Partition { pub other_nodes: Vec, } -pub trait PartitionKey: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync { +pub trait PartitionKey { fn hash(&self) -> Hash; } -pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { +pub trait SortKey { fn sort_key(&self) -> &[u8]; } @@ -87,33 +87,21 @@ impl SortKey for EmptySortKey { } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct StringKey(String); -impl PartitionKey for StringKey { +impl> PartitionKey for T { fn hash(&self) -> Hash { - hash(self.0.as_bytes()) + hash(self.as_ref().as_bytes()) } } -impl SortKey for StringKey { +impl> SortKey for T { fn sort_key(&self) -> &[u8] { - self.0.as_bytes() - } -} -impl AsRef for StringKey { - fn as_ref(&self) -> &str { - &self.0 - } -} -impl From<&str> for StringKey { - fn from(s: &str) -> StringKey { - StringKey(s.to_string()) + self.as_ref().as_bytes() } } #[async_trait] pub trait TableFormat: Send + Sync { - type P: PartitionKey; - type S: SortKey; + type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type E: Entry; async fn updated(&self, old: Option<&Self::E>, new: &Self::E); diff --git a/src/version_table.rs b/src/version_table.rs deleted file mode 100644 index 1542dc42..00000000 --- a/src/version_table.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::sync::Arc; -use serde::{Serialize, Deserialize}; -use async_trait::async_trait; -use tokio::sync::RwLock; - -use crate::data::*; -use crate::table::*; -use crate::server::Garage; - - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct VersionMeta { - pub bucket: StringKey, - pub key: StringKey, - - pub timestamp: u64, - pub uuid: UUID, - - pub mime_type: String, - pub size: u64, - pub is_complete: bool, - - pub data: VersionData, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum VersionData { - DeleteMarker, - Inline(#[serde(with="serde_bytes")] Vec), - FirstBlock(Hash), -} - -pub struct VersionTable { - pub garage: RwLock>>, -} - -impl Entry for VersionMeta { - fn partition_key(&self) -> &StringKey { - &self.bucket - } - fn sort_key(&self) -> &StringKey { - &self.key - } - - fn merge(&mut self, other: &Self) { - unimplemented!() - } -} - -#[async_trait] -impl TableFormat for VersionTable { - type P = StringKey; - type S = StringKey; - type E = VersionMeta; - - async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { - unimplemented!() - } -}