diff --git a/src/api_server.rs b/src/api_server.rs index 056d2aa..441fbe1 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -221,7 +221,7 @@ async fn put_block(garage: Arc, hash: Hash, data: Vec) -> Result<(), &who[..], &Message::PutBlock(PutBlockMessage { hash, data }), (garage.system.config.data_replication_factor + 1) / 2, - DEFAULT_TIMEOUT, + BLOCK_RW_TIMEOUT, ) .await?; Ok(()) @@ -368,7 +368,7 @@ async fn get_block(garage: Arc, hash: &Hash) -> Result, Error> { &who[..], &Message::GetBlock(hash.clone()), 1, - DEFAULT_TIMEOUT, + BLOCK_RW_TIMEOUT, ) .await?; diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs index 0511ea2..4364b64 100644 --- a/src/block_ref_table.rs +++ b/src/block_ref_table.rs @@ -40,7 +40,7 @@ pub struct BlockRefTable { } #[async_trait] -impl TableFormat for BlockRefTable { +impl TableSchema for BlockRefTable { type P = Hash; type S = UUID; type E = BlockRef; diff --git a/src/object_table.rs b/src/object_table.rs index a3a0337..c04a809 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -92,7 +92,7 @@ pub struct ObjectTable { } #[async_trait] -impl TableFormat for ObjectTable { +impl TableSchema for ObjectTable { type P = String; type S = String; type E = Object; diff --git a/src/proto.rs b/src/proto.rs index b39f49e..7d8d389 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -4,7 +4,8 @@ use std::time::Duration; use crate::data::*; -pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); +pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +pub const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); #[derive(Debug, Serialize, Deserialize)] pub enum Message { diff --git a/src/server.rs b/src/server.rs index 0123eb9..8b49f10 100644 --- a/src/server.rs +++ b/src/server.rs @@ -84,7 +84,7 @@ impl Garage { timeout: DEFAULT_TIMEOUT, }; - let block_ref_table = Arc::new(Table::new( + let block_ref_table = Table::new( BlockRefTable { background: background.clone(), block_manager: block_manager.clone(), @@ -93,8 +93,8 @@ impl Garage { &db, "block_ref".to_string(), data_rep_param.clone(), - )); - let version_table = Arc::new(Table::new( + ); + let version_table = Table::new( VersionTable { background: background.clone(), block_ref_table: block_ref_table.clone(), @@ -103,8 +103,8 @@ impl Garage { &db, "version".to_string(), meta_rep_param.clone(), - )); - let object_table = Arc::new(Table::new( + ); + let object_table = Table::new( ObjectTable { background: background.clone(), version_table: version_table.clone(), @@ -113,7 +113,7 @@ impl Garage { &db, "object".to_string(), meta_rep_param.clone(), - )); + ); let mut garage = Self { db, diff --git a/src/table.rs b/src/table.rs index d0f2411..69d818c 100644 --- a/src/table.rs +++ b/src/table.rs @@ -13,7 +13,7 @@ use crate::membership::System; use crate::proto::*; use crate::rpc_client::*; -pub struct Table { +pub struct Table { pub instance: F, pub name: String, @@ -38,12 +38,12 @@ pub trait TableRpcHandler { async fn handle(&self, rpc: &[u8]) -> Result, Error>; } -struct TableRpcHandlerAdapter { +struct TableRpcHandlerAdapter { table: Arc>, } #[async_trait] -impl TableRpcHandler for TableRpcHandlerAdapter { +impl TableRpcHandler for TableRpcHandlerAdapter { async fn handle(&self, rpc: &[u8]) -> Result, Error> { let msg = rmp_serde::decode::from_read_ref::<_, TableRPC>(rpc)?; let rep = self.table.handle(msg).await?; @@ -52,7 +52,7 @@ impl TableRpcHandler for TableRpcHandlerAdapter { } #[derive(Serialize, Deserialize)] -pub enum TableRPC { +pub enum TableRPC { Ok, ReadEntry(F::P, F::S), @@ -115,7 +115,7 @@ impl SortKey for Hash { } #[async_trait] -pub trait TableFormat: Send + Sync { +pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type E: Entry; @@ -123,23 +123,23 @@ pub trait TableFormat: Send + Sync { async fn updated(&self, old: Option, new: Self::E); } -impl Table { +impl Table { pub fn new( instance: F, system: Arc, db: &sled::Db, name: String, param: TableReplicationParams, - ) -> Self { + ) -> Arc { let store = db.open_tree(&name).expect("Unable to open DB tree"); - Self { + Arc::new(Self { instance, name, system, store, partitions: Vec::new(), param, - } + }) } pub fn rpc_handler(self: Arc) -> Box { diff --git a/src/version_table.rs b/src/version_table.rs index 106527b..797b934 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -58,7 +58,7 @@ pub struct VersionTable { } #[async_trait] -impl TableFormat for VersionTable { +impl TableSchema for VersionTable { type P = Hash; type S = EmptySortKey; type E = Version;