Refactor a bit

This commit is contained in:
Alex 2020-04-12 13:03:55 +02:00
parent 419c70e506
commit 5967c5a5af
4 changed files with 61 additions and 58 deletions

View file

@ -1,11 +1,11 @@
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock;
use crate::data::*; use crate::data::*;
use crate::server::Garage;
use crate::table::*; use crate::table::*;
use crate::background::*;
use crate::block::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef { pub struct BlockRef {
@ -35,7 +35,8 @@ impl Entry<Hash, UUID> for BlockRef {
} }
pub struct BlockRefTable { pub struct BlockRefTable {
pub garage: RwLock<Option<Arc<Garage>>>, pub background: Arc<BackgroundRunner>,
pub block_manager: Arc<BlockManager>,
} }
#[async_trait] #[async_trait]
@ -45,19 +46,17 @@ impl TableFormat for BlockRefTable {
type E = BlockRef; type E = BlockRef;
async fn updated(&self, old: Option<Self::E>, new: Self::E) { async fn updated(&self, old: Option<Self::E>, new: Self::E) {
let garage = self.garage.read().await.as_ref().cloned().unwrap();
let was_before = old.map(|x| !x.deleted).unwrap_or(false); let was_before = old.map(|x| !x.deleted).unwrap_or(false);
let is_after = !new.deleted; let is_after = !new.deleted;
if is_after && !was_before { if is_after && !was_before {
if let Err(e) = garage.block_manager.block_incref(&new.block) { if let Err(e) = self.block_manager.block_incref(&new.block) {
eprintln!("Failed to incref block {:?}: {}", &new.block, e); eprintln!("Failed to incref block {:?}: {}", &new.block, e);
} }
} }
if was_before && !is_after { if was_before && !is_after {
if let Err(e) = garage if let Err(e) = self
.block_manager .block_manager
.block_decref(&new.block, &garage.background) .block_decref(&new.block, &self.background)
{ {
eprintln!("Failed to decref block {:?}: {}", &new.block, e); eprintln!("Failed to decref block {:?}: {}", &new.block, e);
} }

View file

@ -1,11 +1,10 @@
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock;
use crate::data::*; use crate::data::*;
use crate::server::Garage;
use crate::table::*; use crate::table::*;
use crate::background::BackgroundRunner;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object { pub struct Object {
@ -88,7 +87,8 @@ impl Entry<String, String> for Object {
} }
pub struct ObjectTable { pub struct ObjectTable {
pub garage: RwLock<Option<Arc<Garage>>>, pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable>>,
} }
#[async_trait] #[async_trait]
@ -98,8 +98,8 @@ impl TableFormat for ObjectTable {
type E = Object; type E = Object;
async fn updated(&self, old: Option<Self::E>, new: Self::E) { async fn updated(&self, old: Option<Self::E>, new: Self::E) {
let garage = self.garage.read().await.as_ref().cloned().unwrap(); let version_table = self.version_table.clone();
garage.clone().background.spawn(async move { self.background.spawn(async move {
// Propagate deletion of old versions // Propagate deletion of old versions
if let Some(old_v) = old { if let Some(old_v) = old {
for v in old_v.versions.iter() { for v in old_v.versions.iter() {
@ -115,7 +115,7 @@ impl TableFormat for ObjectTable {
bucket: old_v.bucket.clone(), bucket: old_v.bucket.clone(),
key: old_v.key.clone(), key: old_v.key.clone(),
}; };
garage.version_table.insert(&deleted_version).await?; version_table.insert(&deleted_version).await?;
} }
} }
} }

View file

@ -6,7 +6,6 @@ use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::watch; use tokio::sync::watch;
use tokio::sync::RwLock;
use crate::api_server; use crate::api_server;
use crate::background::*; use crate::background::*;
@ -36,12 +35,21 @@ pub struct Config {
#[serde(default = "default_replication_factor")] #[serde(default = "default_replication_factor")]
pub data_replication_factor: usize, pub data_replication_factor: usize,
pub tls: TlsConfig,
}
#[derive(Deserialize, Debug)]
pub struct TlsConfig {
pub ca_cert: Option<String>,
pub node_cert: Option<String>,
pub node_key: Option<String>,
} }
pub struct Garage { pub struct Garage {
pub db: sled::Db, pub db: sled::Db,
pub system: Arc<System>, pub system: Arc<System>,
pub block_manager: BlockManager, pub block_manager: Arc<BlockManager>,
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>, pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>,
@ -58,36 +66,10 @@ impl Garage {
db: sled::Db, db: sled::Db,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
) -> Arc<Self> { ) -> Arc<Self> {
let block_manager = BlockManager::new(&db, config.data_dir.clone()); let block_manager = Arc::new(BlockManager::new(&db, config.data_dir.clone()));
let system = Arc::new(System::new(config, id, background.clone())); let system = Arc::new(System::new(config, id, background.clone()));
let meta_rep_param = TableReplicationParams {
replication_factor: system.config.meta_replication_factor,
write_quorum: (system.config.meta_replication_factor + 1) / 2,
read_quorum: (system.config.meta_replication_factor + 1) / 2,
timeout: DEFAULT_TIMEOUT,
};
let object_table = Arc::new(Table::new(
ObjectTable {
garage: RwLock::new(None),
},
system.clone(),
&db,
"object".to_string(),
meta_rep_param.clone(),
));
let version_table = Arc::new(Table::new(
VersionTable {
garage: RwLock::new(None),
},
system.clone(),
&db,
"version".to_string(),
meta_rep_param.clone(),
));
let data_rep_param = TableReplicationParams { let data_rep_param = TableReplicationParams {
replication_factor: system.config.data_replication_factor, replication_factor: system.config.data_replication_factor,
write_quorum: (system.config.data_replication_factor + 1) / 2, write_quorum: (system.config.data_replication_factor + 1) / 2,
@ -95,15 +77,44 @@ impl Garage {
timeout: DEFAULT_TIMEOUT, timeout: DEFAULT_TIMEOUT,
}; };
let meta_rep_param = TableReplicationParams {
replication_factor: system.config.meta_replication_factor,
write_quorum: (system.config.meta_replication_factor + 1) / 2,
read_quorum: (system.config.meta_replication_factor + 1) / 2,
timeout: DEFAULT_TIMEOUT,
};
let block_ref_table = Arc::new(Table::new( let block_ref_table = Arc::new(Table::new(
BlockRefTable { BlockRefTable {
garage: RwLock::new(None), background: background.clone(),
block_manager: block_manager.clone(),
}, },
system.clone(), system.clone(),
&db, &db,
"block_ref".to_string(), "block_ref".to_string(),
data_rep_param.clone(), data_rep_param.clone(),
)); ));
let version_table = Arc::new(Table::new(
VersionTable {
background: background.clone(),
block_ref_table: block_ref_table.clone(),
},
system.clone(),
&db,
"version".to_string(),
meta_rep_param.clone(),
));
let object_table = Arc::new(Table::new(
ObjectTable {
background: background.clone(),
version_table: version_table.clone(),
},
system.clone(),
&db,
"object".to_string(),
meta_rep_param.clone(),
));
let mut garage = Self { let mut garage = Self {
db, db,
@ -129,13 +140,7 @@ impl Garage {
garage.block_ref_table.clone().rpc_handler(), garage.block_ref_table.clone().rpc_handler(),
); );
let garage = Arc::new(garage); Arc::new(garage)
*garage.object_table.instance.garage.write().await = Some(garage.clone());
*garage.version_table.instance.garage.write().await = Some(garage.clone());
*garage.block_ref_table.instance.garage.write().await = Some(garage.clone());
garage
} }
} }

View file

@ -1,11 +1,10 @@
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock;
use crate::data::*; use crate::data::*;
use crate::server::Garage;
use crate::table::*; use crate::table::*;
use crate::background::BackgroundRunner;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version { pub struct Version {
@ -54,7 +53,8 @@ impl Entry<Hash, EmptySortKey> for Version {
} }
pub struct VersionTable { pub struct VersionTable {
pub garage: RwLock<Option<Arc<Garage>>>, pub background: Arc<BackgroundRunner>,
pub block_ref_table: Arc<Table<BlockRefTable>>,
} }
#[async_trait] #[async_trait]
@ -64,8 +64,8 @@ impl TableFormat for VersionTable {
type E = Version; type E = Version;
async fn updated(&self, old: Option<Self::E>, new: Self::E) { async fn updated(&self, old: Option<Self::E>, new: Self::E) {
let garage = self.garage.read().await.as_ref().cloned().unwrap(); let block_ref_table = self.block_ref_table.clone();
garage.clone().background.spawn(async move { self.background.spawn(async move {
// Propagate deletion of version blocks // Propagate deletion of version blocks
if let Some(old_v) = old { if let Some(old_v) = old {
if new.deleted && !old_v.deleted { if new.deleted && !old_v.deleted {
@ -78,8 +78,7 @@ impl TableFormat for VersionTable {
deleted: true, deleted: true,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
garage block_ref_table
.block_ref_table
.insert_many(&deleted_block_refs[..]) .insert_many(&deleted_block_refs[..])
.await?; .await?;
} }