From 5967c5a5af430855fbd73f380041d63bd82f5ce1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sun, 12 Apr 2020 13:03:55 +0200 Subject: [PATCH] Refactor a bit --- src/block_ref_table.rs | 15 ++++---- src/object_table.rs | 12 +++---- src/server.rs | 79 ++++++++++++++++++++++-------------------- src/version_table.rs | 13 ++++--- 4 files changed, 61 insertions(+), 58 deletions(-) diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs index f3a14d8..3e5fb0a 100644 --- a/src/block_ref_table.rs +++ b/src/block_ref_table.rs @@ -1,11 +1,11 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio::sync::RwLock; use crate::data::*; -use crate::server::Garage; use crate::table::*; +use crate::background::*; +use crate::block::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { @@ -35,7 +35,8 @@ impl Entry for BlockRef { } pub struct BlockRefTable { - pub garage: RwLock>>, + pub background: Arc, + pub block_manager: Arc, } #[async_trait] @@ -45,19 +46,17 @@ impl TableFormat for BlockRefTable { type E = BlockRef; async fn updated(&self, old: Option, new: Self::E) { - let garage = self.garage.read().await.as_ref().cloned().unwrap(); - let was_before = old.map(|x| !x.deleted).unwrap_or(false); let is_after = !new.deleted; if is_after && !was_before { - if let Err(e) = garage.block_manager.block_incref(&new.block) { + if let Err(e) = self.block_manager.block_incref(&new.block) { eprintln!("Failed to incref block {:?}: {}", &new.block, e); } } if was_before && !is_after { - if let Err(e) = garage + if let Err(e) = self .block_manager - .block_decref(&new.block, &garage.background) + .block_decref(&new.block, &self.background) { eprintln!("Failed to decref block {:?}: {}", &new.block, e); } diff --git a/src/object_table.rs b/src/object_table.rs index 63dabf2..8ce4956 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -1,11 +1,10 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio::sync::RwLock; use crate::data::*; -use crate::server::Garage; use crate::table::*; +use crate::background::BackgroundRunner; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { @@ -88,7 +87,8 @@ impl Entry for Object { } pub struct ObjectTable { - pub garage: RwLock>>, + pub background: Arc, + pub version_table: Arc>, } #[async_trait] @@ -98,8 +98,8 @@ impl TableFormat for ObjectTable { type E = Object; async fn updated(&self, old: Option, new: Self::E) { - let garage = self.garage.read().await.as_ref().cloned().unwrap(); - garage.clone().background.spawn(async move { + let version_table = self.version_table.clone(); + self.background.spawn(async move { // Propagate deletion of old versions if let Some(old_v) = old { for v in old_v.versions.iter() { @@ -115,7 +115,7 @@ impl TableFormat for ObjectTable { bucket: old_v.bucket.clone(), key: old_v.key.clone(), }; - garage.version_table.insert(&deleted_version).await?; + version_table.insert(&deleted_version).await?; } } } diff --git a/src/server.rs b/src/server.rs index e38e858..29a2dbc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,7 +6,6 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::watch; -use tokio::sync::RwLock; use crate::api_server; use crate::background::*; @@ -36,12 +35,21 @@ pub struct Config { #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, + + pub tls: TlsConfig, +} + +#[derive(Deserialize, Debug)] +pub struct TlsConfig { + pub ca_cert: Option, + pub node_cert: Option, + pub node_key: Option, } pub struct Garage { pub db: sled::Db, pub system: Arc, - pub block_manager: BlockManager, + pub block_manager: Arc, pub background: Arc, pub table_rpc_handlers: HashMap>, @@ -58,36 +66,10 @@ impl Garage { db: sled::Db, background: Arc, ) -> Arc { - let block_manager = BlockManager::new(&db, config.data_dir.clone()); + let block_manager = Arc::new(BlockManager::new(&db, config.data_dir.clone())); let system = Arc::new(System::new(config, id, background.clone())); - let meta_rep_param = TableReplicationParams { - replication_factor: system.config.meta_replication_factor, - write_quorum: (system.config.meta_replication_factor + 1) / 2, - read_quorum: (system.config.meta_replication_factor + 1) / 2, - timeout: DEFAULT_TIMEOUT, - }; - - let object_table = Arc::new(Table::new( - ObjectTable { - garage: RwLock::new(None), - }, - system.clone(), - &db, - "object".to_string(), - meta_rep_param.clone(), - )); - let version_table = Arc::new(Table::new( - VersionTable { - garage: RwLock::new(None), - }, - system.clone(), - &db, - "version".to_string(), - meta_rep_param.clone(), - )); - let data_rep_param = TableReplicationParams { replication_factor: system.config.data_replication_factor, write_quorum: (system.config.data_replication_factor + 1) / 2, @@ -95,15 +77,44 @@ impl Garage { timeout: DEFAULT_TIMEOUT, }; + let meta_rep_param = TableReplicationParams { + replication_factor: system.config.meta_replication_factor, + write_quorum: (system.config.meta_replication_factor + 1) / 2, + read_quorum: (system.config.meta_replication_factor + 1) / 2, + timeout: DEFAULT_TIMEOUT, + }; + let block_ref_table = Arc::new(Table::new( BlockRefTable { - garage: RwLock::new(None), + background: background.clone(), + block_manager: block_manager.clone(), }, system.clone(), &db, "block_ref".to_string(), data_rep_param.clone(), )); + let version_table = Arc::new(Table::new( + VersionTable { + background: background.clone(), + block_ref_table: block_ref_table.clone(), + }, + system.clone(), + &db, + "version".to_string(), + meta_rep_param.clone(), + )); + let object_table = Arc::new(Table::new( + ObjectTable { + background: background.clone(), + version_table: version_table.clone(), + }, + system.clone(), + &db, + "object".to_string(), + meta_rep_param.clone(), + )); + let mut garage = Self { db, @@ -129,13 +140,7 @@ impl Garage { garage.block_ref_table.clone().rpc_handler(), ); - let garage = Arc::new(garage); - - *garage.object_table.instance.garage.write().await = Some(garage.clone()); - *garage.version_table.instance.garage.write().await = Some(garage.clone()); - *garage.block_ref_table.instance.garage.write().await = Some(garage.clone()); - - garage + Arc::new(garage) } } diff --git a/src/version_table.rs b/src/version_table.rs index 9ea0551..cb70c64 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -1,11 +1,10 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::sync::Arc; -use tokio::sync::RwLock; use crate::data::*; -use crate::server::Garage; use crate::table::*; +use crate::background::BackgroundRunner; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Version { @@ -54,7 +53,8 @@ impl Entry for Version { } pub struct VersionTable { - pub garage: RwLock>>, + pub background: Arc, + pub block_ref_table: Arc>, } #[async_trait] @@ -64,8 +64,8 @@ impl TableFormat for VersionTable { type E = Version; async fn updated(&self, old: Option, new: Self::E) { - let garage = self.garage.read().await.as_ref().cloned().unwrap(); - garage.clone().background.spawn(async move { + let block_ref_table = self.block_ref_table.clone(); + self.background.spawn(async move { // Propagate deletion of version blocks if let Some(old_v) = old { if new.deleted && !old_v.deleted { @@ -78,8 +78,7 @@ impl TableFormat for VersionTable { deleted: true, }) .collect::>(); - garage - .block_ref_table + block_ref_table .insert_many(&deleted_block_refs[..]) .await?; }