diff --git a/src/api_server.rs b/src/api_server.rs index a92fd36b..a5e6e322 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -2,17 +2,20 @@ use std::sync::Arc; use std::net::SocketAddr; use std::collections::VecDeque; -use futures::stream::StreamExt; +use futures::stream::*; use hyper::service::{make_service_fn, service_fn}; use hyper::server::conn::AddrStream; use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use hyper::body::Bytes; use futures::future::Future; use crate::error::Error; use crate::data::*; +use crate::data; use crate::proto::*; use crate::rpc_client::*; use crate::server::Garage; +use crate::table::EmptySortKey; pub async fn run_api_server(garage: Arc, shutdown_signal: impl Future) -> Result<(), hyper::Error> { let addr = ([0, 0, 0, 0], garage.system.config.api_port).into(); @@ -69,7 +72,7 @@ async fn handler_inner(garage: Arc, req: Request, addr: SocketAddr .to_string(); let version_uuid = handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?; Ok(Response::new(Body::from( - format!("Version UUID: {:?}", version_uuid), + format!("{:?}\n", version_uuid), ))) } _ => Err(Error::BadRequest(format!("Invalid method"))), @@ -94,43 +97,46 @@ async fn handle_put(garage: Arc, key: key.into(), versions: Vec::new(), }; - object.versions.push(Box::new(Version{ + object.versions.push(Box::new(ObjectVersion{ uuid: version_uuid.clone(), timestamp: now_msec(), mime_type: mime_type.to_string(), size: first_block.len() as u64, is_complete: false, - data: VersionData::DeleteMarker, + data: ObjectVersionData::DeleteMarker, })); if first_block.len() < INLINE_THRESHOLD { - object.versions[0].data = VersionData::Inline(first_block); + object.versions[0].data = ObjectVersionData::Inline(first_block); object.versions[0].is_complete = true; garage.object_table.insert(&object).await?; return Ok(version_uuid) } + let version = Version { + version: version_uuid.clone(), + deleted: false, + blocks: Vec::new(), + bucket: bucket.into(), + key: key.into(), + }; + let first_block_hash = hash(&first_block[..]); - object.versions[0].data = VersionData::FirstBlock(first_block_hash); + object.versions[0].data = ObjectVersionData::FirstBlock(first_block_hash.clone()); garage.object_table.insert(&object).await?; - let block_meta = BlockMeta{ - version_uuid: version_uuid.clone(), - offset: 0, - hash: hash(&first_block[..]), - }; let mut next_offset = first_block.len(); - let mut put_curr_block = put_block(garage.clone(), block_meta, first_block); + let mut put_curr_version_block = put_version_block(garage.clone(), &version, 0, first_block_hash.clone()); + let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block); + loop { - let (_, next_block) = futures::try_join!(put_curr_block, chunker.next())?; + let (_, _, next_block) = futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; if let Some(block) = next_block { - let block_meta = BlockMeta{ - version_uuid: version_uuid.clone(), - offset: next_offset as u64, - hash: hash(&block[..]), - }; - next_offset += block.len(); - put_curr_block = put_block(garage.clone(), block_meta, block); + let block_hash = hash(&block[..]); + let block_len = block.len(); + put_curr_version_block = put_version_block(garage.clone(), &version, next_offset as u64, block_hash.clone()); + put_curr_block = put_block(garage.clone(), block_hash, block); + next_offset += block_len; } else { break; } @@ -144,13 +150,23 @@ async fn handle_put(garage: Arc, Ok(version_uuid) } -async fn put_block(garage: Arc, meta: BlockMeta, data: Vec) -> Result<(), Error> { +async fn put_version_block(garage: Arc, version: &Version, offset: u64, hash: Hash) -> Result<(), Error> { + let mut version = version.clone(); + version.blocks.push(VersionBlock{ + offset, + hash, + }); + garage.version_table.insert(&version).await?; + Ok(()) +} + +async fn put_block(garage: Arc, hash: Hash, data: Vec) -> Result<(), Error> { let who = garage.system.members.read().await - .walk_ring(&meta.hash, garage.system.config.meta_replication_factor); + .walk_ring(&hash, garage.system.config.meta_replication_factor); rpc_try_call_many(garage.system.clone(), &who[..], &Message::PutBlock(PutBlockMessage{ - meta, + hash, data, }), (garage.system.config.meta_replication_factor+1)/2, @@ -160,6 +176,7 @@ async fn put_block(garage: Arc, meta: BlockMeta, data: Vec) -> Resul struct BodyChunker { body: Body, + read_all: bool, block_size: usize, buf: VecDeque, } @@ -168,17 +185,19 @@ impl BodyChunker { fn new(body: Body, block_size: usize) -> Self { Self{ body, + read_all: false, block_size, buf: VecDeque::new(), } } async fn next(&mut self) -> Result>, Error> { - while self.buf.len() < self.block_size { + while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.body.next().await { let bytes = block?; + eprintln!("Body next: {} bytes", bytes.len()); self.buf.extend(&bytes[..]); } else { - break; + self.read_all = true; } } if self.buf.len() == 0 { @@ -213,13 +232,56 @@ async fn handle_get(garage: Arc, bucket: &str, key: &str) -> Result Err(Error::NotFound), - VersionData::Inline(bytes) => { + ObjectVersionData::DeleteMarker => Err(Error::NotFound), + ObjectVersionData::Inline(bytes) => { Ok(resp_builder.body(bytes.into())?) } - VersionData::FirstBlock(hash) => { - // TODO - unimplemented!() + ObjectVersionData::FirstBlock(first_block_hash) => { + let read_first_block = get_block(garage.clone(), &first_block_hash); + let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptySortKey); + + let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; + let version = match version { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let mut blocks = version.blocks.iter() + .map(|vb| (vb.hash.clone(), None)) + .collect::>(); + blocks[0].1 = Some(first_block); + + let block_futures = blocks.drain(..) + .map(move |(hash, data_opt)| async { + if let Some(data) = data_opt { + Ok(data) + } else { + get_block(garage.clone(), &hash).await + .map_err(|e| format!("{}", e)) + } + }); + let body_stream = futures::stream::iter(block_futures).buffered(2); + let body = Body::wrap_stream(body_stream); + Ok(resp_builder.body(body)?) } } } + +async fn get_block(garage: Arc, hash: &Hash) -> Result, Error> { + let who = garage.system.members.read().await + .walk_ring(&hash, garage.system.config.meta_replication_factor); + let resps = rpc_try_call_many(garage.system.clone(), + &who[..], + &Message::GetBlock(hash.clone()), + 1, + DEFAULT_TIMEOUT).await?; + + for resp in resps { + if let Message::PutBlock(pbm) = resp { + if data::hash(&pbm.data) == *hash { + return Ok(pbm.data) + } + } + } + Err(Error::Message(format!("No valid blocks returned"))) +} diff --git a/src/block.rs b/src/block.rs new file mode 100644 index 00000000..b9e7eee8 --- /dev/null +++ b/src/block.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; +use std::path::PathBuf; + +use tokio::fs; +use tokio::prelude::*; + +use crate::error::Error; +use crate::server::Garage; +use crate::proto::*; +use crate::data::*; + +fn block_dir(garage: &Garage, hash: &Hash) -> PathBuf { + let mut path = garage.system.config.data_dir.clone(); + path.push(hex::encode(&hash.as_slice()[0..1])); + path.push(hex::encode(&hash.as_slice()[1..2])); + path +} + +pub async fn write_block(garage: Arc, hash: &Hash, data: &[u8]) -> Result { + garage.fs_lock.lock().await; + + let mut path = block_dir(&garage, hash); + fs::create_dir_all(&path).await?; + + path.push(hex::encode(hash)); + if fs::metadata(&path).await.is_ok() { + return Ok(Message::Ok) + } + + let mut f = fs::File::create(path).await?; + f.write_all(data).await?; + drop(f); + + Ok(Message::Ok) +} + +pub async fn read_block(garage: Arc, hash: &Hash) -> Result { + let mut path = block_dir(&garage, hash); + path.push(hex::encode(hash)); + + let mut f = fs::File::open(path).await?; + let mut data = vec![]; + f.read_to_end(&mut data).await?; + + Ok(Message::PutBlock(PutBlockMessage{ + hash: hash.clone(), + data, + })) +} diff --git a/src/data.rs b/src/data.rs index 91b21b02..a538f98a 100644 --- a/src/data.rs +++ b/src/data.rs @@ -136,16 +136,4 @@ pub struct SplitpointMeta { } pub use crate::object_table::*; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct BlockMeta { - pub version_uuid: UUID, - pub offset: u64, - pub hash: Hash, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct BlockReverseMeta { - pub versions: Vec, - pub deleted_versions: Vec, -} +pub use crate::version_table::*; diff --git a/src/main.rs b/src/main.rs index 2303e7a9..324f0d49 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,8 @@ mod membership; mod table; mod object_table; +mod version_table; +mod block; mod server; mod rpc_server; diff --git a/src/membership.rs b/src/membership.rs index 314495e9..3f7a84c4 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -167,11 +167,9 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result { .open(path.as_path())?; let mut net_config_bytes = vec![]; - file.read_to_end(&mut net_config_bytes) - .expect("Failure when reading network_config"); + file.read_to_end(&mut net_config_bytes)?; - let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..]) - .expect("Invalid or corrupt network_config file"); + let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?; Ok(net_config) } @@ -180,9 +178,12 @@ impl System { pub fn new(config: Config, id: UUID) -> Self { let net_config = match read_network_config(&config.metadata_dir) { Ok(x) => x, - Err(_) => NetworkConfig{ - members: HashMap::new(), - version: 0, + Err(e) => { + println!("No valid previous network configuration stored ({}), starting fresh.", e); + NetworkConfig{ + members: HashMap::new(), + version: 0, + } }, }; let mut members = Members{ diff --git a/src/object_table.rs b/src/object_table.rs index 092dddf8..37b9fc0a 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -8,16 +8,20 @@ use crate::table::*; use crate::server::Garage; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { + // Primary key pub bucket: String, + + // Sort key pub key: String, - pub versions: Vec>, + // Data + pub versions: Vec>, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Version { +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersion { pub uuid: UUID, pub timestamp: u64, @@ -25,20 +29,16 @@ pub struct Version { pub size: u64, pub is_complete: bool, - pub data: VersionData, + pub data: ObjectVersionData, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum VersionData { +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionData { DeleteMarker, Inline(#[serde(with="serde_bytes")] Vec), FirstBlock(Hash), } -pub struct ObjectTable { - pub garage: RwLock>>, -} - impl Entry for Object { fn partition_key(&self) -> &String { &self.bucket @@ -47,25 +47,20 @@ impl Entry for Object { &self.key } - fn merge(&mut self, other: &Self) -> bool { - let mut has_change = false; - + fn merge(&mut self, other: &Self) { for other_v in other.versions.iter() { match self.versions.binary_search_by(|v| (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid))) { Ok(i) => { let mut v = &mut self.versions[i]; if other_v.size > v.size { v.size = other_v.size; - has_change = true; } if other_v.is_complete && !v.is_complete { v.is_complete = true; - has_change = true; } } Err(i) => { self.versions.insert(i, other_v.clone()); - has_change = true; } } } @@ -78,11 +73,13 @@ impl Entry for Object { if let Some(last_vi) = last_complete { self.versions = self.versions.drain(last_vi..).collect::>(); } - - has_change } } +pub struct ObjectTable { + pub garage: RwLock>>, +} + #[async_trait] impl TableFormat for ObjectTable { type P = String; diff --git a/src/proto.rs b/src/proto.rs index 99ab8fbe..df64a438 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -17,6 +17,7 @@ pub enum Message { AdvertiseNodesUp(Vec), AdvertiseConfig(NetworkConfig), + GetBlock(Hash), PutBlock(PutBlockMessage), TableRPC(String, #[serde(with = "serde_bytes")] Vec), @@ -39,7 +40,7 @@ pub struct AdvertisedNode { #[derive(Debug, Serialize, Deserialize)] pub struct PutBlockMessage { - pub meta: BlockMeta, + pub hash: Hash, #[serde(with="serde_bytes")] pub data: Vec, diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 9c9726b1..134f8e98 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -102,7 +102,7 @@ impl RpcClient { timeout: Duration) -> Result { - let uri = format!("http://{}/", to_addr); + let uri = format!("http://{}/rpc", to_addr); let req = Request::builder() .method(Method::POST) .uri(uri) diff --git a/src/rpc_server.rs b/src/rpc_server.rs index d3bc174d..7d8df658 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; +use serde::Serialize; use bytes::IntoBuf; use hyper::service::{make_service_fn, service_fn}; use hyper::server::conn::AddrStream; @@ -11,6 +12,16 @@ use crate::error::Error; use crate::data::rmp_to_vec_all_named; use crate::proto::Message; use crate::server::Garage; +use crate::block::*; + +fn debug_serialize(x: T) -> Result { + let ss = serde_json::to_string(&x)?; + if ss.len() > 100 { + Ok(ss[..100].to_string()) + } else { + Ok(ss) + } +} fn err_to_msg(x: Result) -> Message { match x { @@ -29,7 +40,7 @@ async fn handler(garage: Arc, req: Request, addr: SocketAddr) -> R let whole_body = hyper::body::to_bytes(req.into_body()).await?; let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?; - eprintln!("RPC from {}: {}", addr, serde_json::to_string(&msg)?); + eprintln!("RPC from {}: {} ({} bytes)", addr, debug_serialize(&msg)?, whole_body.len()); let sys = garage.system.clone(); let resp = err_to_msg(match &msg { @@ -38,6 +49,12 @@ async fn handler(garage: Arc, req: Request, addr: SocketAddr) -> R Message::PullConfig => sys.handle_pull_config().await, Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await, Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await, + Message::PutBlock(m) => { + write_block(garage, &m.hash, &m.data).await + } + Message::GetBlock(h) => { + read_block(garage, &h).await + } Message::TableRPC(table, msg) => { if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) { rpc_handler.handle(&msg[..]).await @@ -50,7 +67,7 @@ async fn handler(garage: Arc, req: Request, addr: SocketAddr) -> R _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))), }); - eprintln!("reply to {}: {}", addr, serde_json::to_string(&resp)?); + eprintln!("reply to {}: {}", addr, debug_serialize(&resp)?); Ok(Response::new(Body::from( rmp_to_vec_all_named(&resp)? diff --git a/src/server.rs b/src/server.rs index cc7e5dce..d9f98164 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,7 +5,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use futures::channel::oneshot; use serde::Deserialize; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use crate::data::*; use crate::proto::*; @@ -18,10 +18,12 @@ use crate::table::*; pub struct Garage { pub db: sled::Db, pub system: Arc, + pub fs_lock: Mutex<()>, pub table_rpc_handlers: HashMap>, pub object_table: Arc>, + pub version_table: Arc>, } impl Garage { @@ -41,19 +43,34 @@ impl Garage { &db, "object".to_string(), meta_rep_param.clone())); + let version_table = Arc::new(Table::new( + VersionTable{garage: RwLock::new(None)}, + system.clone(), + &db, + "version".to_string(), + meta_rep_param.clone())); let mut garage = Self{ db, system: system.clone(), + fs_lock: Mutex::new(()), table_rpc_handlers: HashMap::new(), object_table, + version_table, }; + garage.table_rpc_handlers.insert( garage.object_table.name.clone(), garage.object_table.clone().rpc_handler()); + garage.table_rpc_handlers.insert( + garage.version_table.name.clone(), + garage.version_table.clone().rpc_handler()); let garage = Arc::new(garage); + *garage.object_table.instance.garage.write().await = Some(garage.clone()); + *garage.version_table.instance.garage.write().await = Some(garage.clone()); + garage } } diff --git a/src/table.rs b/src/table.rs index 6d309967..def0d8b8 100644 --- a/src/table.rs +++ b/src/table.rs @@ -72,11 +72,11 @@ pub trait SortKey { fn sort_key(&self) -> &[u8]; } -pub trait Entry: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { +pub trait Entry: PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; - fn merge(&mut self, other: &Self) -> bool; + fn merge(&mut self, other: &Self); } #[derive(Clone, Serialize, Deserialize)] @@ -98,6 +98,12 @@ impl> SortKey for T { } } +impl PartitionKey for Hash { + fn hash(&self) -> Hash { + self.clone() + } +} + #[async_trait] pub trait TableFormat: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; @@ -159,9 +165,9 @@ impl Table { ret = match ret { None => Some(v), Some(mut x) => { - let updated = x.merge(&v); - if updated { + if x != v { not_all_same = true; + x.merge(&v); } Some(x) } @@ -239,17 +245,16 @@ impl Table { let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let (old_entry, new_entry) = self.store.transaction(|db| { - let mut new_entry = update.clone(); - - let old_entry = match db.get(&tree_key)? { + let (old_entry, new_entry) = match db.get(&tree_key)? { Some(prev_bytes) => { let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes) .map_err(Error::RMPDecode) .map_err(sled::ConflictableTransactionError::Abort)?; - new_entry.merge(&old_entry); - Some(old_entry) + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) } - None => None + None => (None, update.clone()) }; let new_bytes = rmp_to_vec_all_named(&new_entry) diff --git a/src/version_table.rs b/src/version_table.rs new file mode 100644 index 00000000..8c48d3af --- /dev/null +++ b/src/version_table.rs @@ -0,0 +1,71 @@ +use std::sync::Arc; +use serde::{Serialize, Deserialize}; +use async_trait::async_trait; +use tokio::sync::RwLock; + +use crate::data::*; +use crate::table::*; +use crate::server::Garage; + + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Version { + // Primary key + pub version: UUID, + + // Actual data: the blocks for this version + pub deleted: bool, + pub blocks: Vec, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + pub bucket: String, + pub key: String, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct VersionBlock { + pub offset: u64, + pub hash: Hash, +} + +impl Entry for Version { + fn partition_key(&self) -> &Hash { + &self.version + } + fn sort_key(&self) -> &EmptySortKey { + &EmptySortKey + } + + fn merge(&mut self, other: &Self) { + if other.deleted { + self.deleted = true; + self.blocks.clear(); + } else if !self.deleted { + for bi in other.blocks.iter() { + match self.blocks.binary_search_by(|x| x.offset.cmp(&bi.offset)) { + Ok(_) => (), + Err(pos) => { + self.blocks.insert(pos, bi.clone()); + } + } + } + } + } +} + +pub struct VersionTable { + pub garage: RwLock>>, +} + +#[async_trait] +impl TableFormat for VersionTable { + type P = Hash; + type S = EmptySortKey; + type E = Version; + + async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { + //unimplemented!() + // TODO + } +}