From 060f83edf182a7d9c9a68823aee9a184d5a3c13b Mon Sep 17 00:00:00 2001 From: Yureka Date: Mon, 4 Mar 2024 18:43:08 +0100 Subject: [PATCH] other refactors --- src/block/manager.rs | 11 ++--------- src/block/resync.rs | 10 ++++++---- src/model/garage.rs | 13 +------------ src/rpc/layout/helper.rs | 13 +------------ src/rpc/layout/manager.rs | 18 ++++++++++++------ src/table/replication/sharded.rs | 24 +++++++++--------------- src/util/crdt/lww.rs | 2 +- 7 files changed, 32 insertions(+), 59 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index f4d8ee56..218ef9eb 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -33,8 +33,6 @@ use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; use garage_rpc::*; -use garage_table::replication::{TableReplication, TableShardedReplication}; - use crate::block::*; use crate::layout::*; use crate::metrics::*; @@ -74,9 +72,6 @@ impl Rpc for BlockRpc { /// The block manager, handling block exchange between nodes, and block storage on local node pub struct BlockManager { - /// Replication strategy, allowing to find on which node blocks should be located - pub replication: TableShardedReplication, - /// Data layout pub(crate) data_layout: ArcSwap, /// Data layout persister @@ -122,7 +117,6 @@ impl BlockManager { data_dir: DataDirEnum, data_fsync: bool, compression_level: Option, - replication: TableShardedReplication, system: Arc, ) -> Result, Error> { // Load or compute layout, i.e. assignment of data blocks to the different data directories @@ -163,7 +157,6 @@ impl BlockManager { let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); let block_manager = Arc::new(Self { - replication, data_layout: ArcSwap::new(Arc::new(data_layout)), data_layout_persister, data_fsync, @@ -354,7 +347,7 @@ impl BlockManager { data: Bytes, order_tag: Option, ) -> Result<(), Error> { - let who = self.replication.write_sets(&hash); + let who = self.system.layout_manager.write_sets_of(&hash); let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) .await @@ -374,7 +367,7 @@ impl BlockManager { who.as_ref(), put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) - .with_quorum(self.replication.write_quorum()), + .with_quorum(self.system.layout_manager.write_quorum()), ) .await?; diff --git a/src/block/resync.rs b/src/block/resync.rs index 15f210e4..180e7bcf 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -28,8 +28,6 @@ use garage_util::tranquilizer::Tranquilizer; use garage_rpc::system::System; use garage_rpc::*; -use garage_table::replication::TableReplication; - use crate::manager::*; // The delay between the time where a resync operation fails @@ -377,8 +375,12 @@ impl BlockResyncManager { info!("Resync block {:?}: offloading and deleting", hash); let existing_path = existing_path.unwrap(); - let mut who = manager.replication.storage_nodes(hash); - if who.len() < manager.replication.write_quorum() { + let mut who = manager + .system + .layout_manager + .layout() + .storage_nodes_of(hash); + if who.len() < manager.system.layout_manager.write_quorum() { return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string())); } who.retain(|id| *id != manager.system.id); diff --git a/src/model/garage.rs b/src/model/garage.rs index 19f58077..482e187d 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -230,18 +230,8 @@ impl Garage { info!("Initialize membership management system..."); let system = System::new(network_key, replication_factor, consistency_mode, &config)?; - let data_rep_param = TableShardedReplication { - system: system.clone(), - replication_factor: replication_factor.into(), - write_quorum: replication_factor.write_quorum(consistency_mode), - read_quorum: 1, - }; - let meta_rep_param = TableShardedReplication { - system: system.clone(), - replication_factor: replication_factor.into(), - write_quorum: replication_factor.write_quorum(consistency_mode), - read_quorum: replication_factor.read_quorum(consistency_mode), + layout_manager: system.layout_manager.clone(), }; let control_rep_param = TableFullReplication { @@ -254,7 +244,6 @@ impl Garage { config.data_dir.clone(), config.data_fsync, config.compression_level, - data_rep_param, system.clone(), )?; block_manager.register_bg_vars(&mut bg_vars); diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 2835347a..cd72e2d3 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; use super::*; -use crate::replication_mode::*; +use crate::replication_mode::ReplicationFactor; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] pub struct RpcLayoutDigest { @@ -30,7 +30,6 @@ pub struct SyncLayoutDigest { pub struct LayoutHelper { replication_factor: ReplicationFactor, - consistency_mode: ConsistencyMode, layout: Option, // cached values @@ -59,7 +58,6 @@ impl Deref for LayoutHelper { impl LayoutHelper { pub fn new( replication_factor: ReplicationFactor, - consistency_mode: ConsistencyMode, mut layout: LayoutHistory, mut ack_lock: HashMap, ) -> Self { @@ -68,13 +66,6 @@ impl LayoutHelper { // correct and we have rapid access to important values such as // the layout versions to use when reading to ensure consistency. - if consistency_mode != ConsistencyMode::Consistent { - // Fast path for when no consistency is required. - // In this case we only need to keep the last version of the layout, - // we don't care about coordinating stuff in the cluster. - layout.keep_current_version_only(); - } - layout.cleanup_old_versions(); let all_nodes = layout.get_all_nodes(); @@ -117,7 +108,6 @@ impl LayoutHelper { LayoutHelper { replication_factor, - consistency_mode, layout: Some(layout), ack_map_min, sync_map_min, @@ -143,7 +133,6 @@ impl LayoutHelper { if changed { *self = Self::new( self.replication_factor, - self.consistency_mode, self.layout.take().unwrap(), std::mem::take(&mut self.ack_lock), ); diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 8a6eb1c3..846eea47 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -21,6 +21,7 @@ use crate::system::*; pub struct LayoutManager { node_id: Uuid, replication_factor: ReplicationFactor, + consistency_mode: ConsistencyMode, persist_cluster_layout: Persister, layout: Arc>, @@ -64,12 +65,8 @@ impl LayoutManager { } }; - let mut cluster_layout = LayoutHelper::new( - replication_factor, - consistency_mode, - cluster_layout, - Default::default(), - ); + let mut cluster_layout = + LayoutHelper::new(replication_factor, cluster_layout, Default::default()); cluster_layout.update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); @@ -85,6 +82,7 @@ impl LayoutManager { Ok(Arc::new(Self { node_id: node_id.into(), replication_factor, + consistency_mode, persist_cluster_layout, layout, change_notify, @@ -141,6 +139,14 @@ impl LayoutManager { } } + pub fn read_quorum(self: &Arc) -> usize { + self.replication_factor.read_quorum(self.consistency_mode) + } + + pub fn write_quorum(self: &Arc) -> usize { + self.replication_factor.write_quorum(self.consistency_mode) + } + // ---- ACK LOCKING ---- pub fn write_sets_of(self: &Arc, position: &Hash) -> WriteLock>> { diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index e0245949..fa5e48d7 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,7 +1,7 @@ use std::sync::Arc; +use garage_rpc::layout::manager::LayoutManager; use garage_rpc::layout::*; -use garage_rpc::system::System; use garage_util::data::*; use crate::replication::*; @@ -15,42 +15,36 @@ use crate::replication::*; #[derive(Clone)] pub struct TableShardedReplication { /// The membership manager of this node - pub system: Arc, - /// How many time each data should be replicated - pub replication_factor: usize, - /// How many nodes to contact for a read, should be at most `replication_factor` - pub read_quorum: usize, - /// How many nodes to contact for a write, should be at most `replication_factor` - pub write_quorum: usize, + pub layout_manager: Arc, } impl TableReplication for TableShardedReplication { type WriteSets = WriteLock>>; fn storage_nodes(&self, hash: &Hash) -> Vec { - self.system.cluster_layout().storage_nodes_of(hash) + self.layout_manager.layout().storage_nodes_of(hash) } fn read_nodes(&self, hash: &Hash) -> Vec { - self.system.cluster_layout().read_nodes_of(hash) + self.layout_manager.layout().read_nodes_of(hash) } fn read_quorum(&self) -> usize { - self.read_quorum + self.layout_manager.read_quorum() } fn write_sets(&self, hash: &Hash) -> Self::WriteSets { - self.system.layout_manager.write_sets_of(hash) + self.layout_manager.write_sets_of(hash) } fn write_quorum(&self) -> usize { - self.write_quorum + self.layout_manager.write_quorum() } fn partition_of(&self, hash: &Hash) -> Partition { - self.system.cluster_layout().current().partition_of(hash) + self.layout_manager.layout().current().partition_of(hash) } fn sync_partitions(&self) -> SyncPartitions { - let layout = self.system.cluster_layout(); + let layout = self.layout_manager.layout(); let layout_version = layout.ack_map_min(); let mut partitions = layout diff --git a/src/util/crdt/lww.rs b/src/util/crdt/lww.rs index 958844c9..0677a673 100644 --- a/src/util/crdt/lww.rs +++ b/src/util/crdt/lww.rs @@ -84,7 +84,7 @@ where &self.v } - /// Take the value inside the CRDT (discards the timesamp) + /// Take the value inside the CRDT (discards the timestamp) pub fn take(self) -> T { self.v }