This commit is contained in:
parent
ce042e48d4
commit
060f83edf1
7 changed files with 32 additions and 59 deletions
|
@ -33,8 +33,6 @@ use garage_rpc::rpc_helper::OrderTag;
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
|
||||||
use garage_table::replication::{TableReplication, TableShardedReplication};
|
|
||||||
|
|
||||||
use crate::block::*;
|
use crate::block::*;
|
||||||
use crate::layout::*;
|
use crate::layout::*;
|
||||||
use crate::metrics::*;
|
use crate::metrics::*;
|
||||||
|
@ -74,9 +72,6 @@ impl Rpc for BlockRpc {
|
||||||
|
|
||||||
/// The block manager, handling block exchange between nodes, and block storage on local node
|
/// The block manager, handling block exchange between nodes, and block storage on local node
|
||||||
pub struct BlockManager {
|
pub struct BlockManager {
|
||||||
/// Replication strategy, allowing to find on which node blocks should be located
|
|
||||||
pub replication: TableShardedReplication,
|
|
||||||
|
|
||||||
/// Data layout
|
/// Data layout
|
||||||
pub(crate) data_layout: ArcSwap<DataLayout>,
|
pub(crate) data_layout: ArcSwap<DataLayout>,
|
||||||
/// Data layout persister
|
/// Data layout persister
|
||||||
|
@ -122,7 +117,6 @@ impl BlockManager {
|
||||||
data_dir: DataDirEnum,
|
data_dir: DataDirEnum,
|
||||||
data_fsync: bool,
|
data_fsync: bool,
|
||||||
compression_level: Option<i32>,
|
compression_level: Option<i32>,
|
||||||
replication: TableShardedReplication,
|
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
) -> Result<Arc<Self>, Error> {
|
) -> Result<Arc<Self>, Error> {
|
||||||
// Load or compute layout, i.e. assignment of data blocks to the different data directories
|
// Load or compute layout, i.e. assignment of data blocks to the different data directories
|
||||||
|
@ -163,7 +157,6 @@ impl BlockManager {
|
||||||
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
|
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
|
||||||
|
|
||||||
let block_manager = Arc::new(Self {
|
let block_manager = Arc::new(Self {
|
||||||
replication,
|
|
||||||
data_layout: ArcSwap::new(Arc::new(data_layout)),
|
data_layout: ArcSwap::new(Arc::new(data_layout)),
|
||||||
data_layout_persister,
|
data_layout_persister,
|
||||||
data_fsync,
|
data_fsync,
|
||||||
|
@ -354,7 +347,7 @@ impl BlockManager {
|
||||||
data: Bytes,
|
data: Bytes,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let who = self.replication.write_sets(&hash);
|
let who = self.system.layout_manager.write_sets_of(&hash);
|
||||||
|
|
||||||
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
|
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
|
||||||
.await
|
.await
|
||||||
|
@ -374,7 +367,7 @@ impl BlockManager {
|
||||||
who.as_ref(),
|
who.as_ref(),
|
||||||
put_block_rpc,
|
put_block_rpc,
|
||||||
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
||||||
.with_quorum(self.replication.write_quorum()),
|
.with_quorum(self.system.layout_manager.write_quorum()),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|
|
@ -28,8 +28,6 @@ use garage_util::tranquilizer::Tranquilizer;
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
|
||||||
use garage_table::replication::TableReplication;
|
|
||||||
|
|
||||||
use crate::manager::*;
|
use crate::manager::*;
|
||||||
|
|
||||||
// The delay between the time where a resync operation fails
|
// The delay between the time where a resync operation fails
|
||||||
|
@ -377,8 +375,12 @@ impl BlockResyncManager {
|
||||||
info!("Resync block {:?}: offloading and deleting", hash);
|
info!("Resync block {:?}: offloading and deleting", hash);
|
||||||
let existing_path = existing_path.unwrap();
|
let existing_path = existing_path.unwrap();
|
||||||
|
|
||||||
let mut who = manager.replication.storage_nodes(hash);
|
let mut who = manager
|
||||||
if who.len() < manager.replication.write_quorum() {
|
.system
|
||||||
|
.layout_manager
|
||||||
|
.layout()
|
||||||
|
.storage_nodes_of(hash);
|
||||||
|
if who.len() < manager.system.layout_manager.write_quorum() {
|
||||||
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
||||||
}
|
}
|
||||||
who.retain(|id| *id != manager.system.id);
|
who.retain(|id| *id != manager.system.id);
|
||||||
|
|
|
@ -230,18 +230,8 @@ impl Garage {
|
||||||
info!("Initialize membership management system...");
|
info!("Initialize membership management system...");
|
||||||
let system = System::new(network_key, replication_factor, consistency_mode, &config)?;
|
let system = System::new(network_key, replication_factor, consistency_mode, &config)?;
|
||||||
|
|
||||||
let data_rep_param = TableShardedReplication {
|
|
||||||
system: system.clone(),
|
|
||||||
replication_factor: replication_factor.into(),
|
|
||||||
write_quorum: replication_factor.write_quorum(consistency_mode),
|
|
||||||
read_quorum: 1,
|
|
||||||
};
|
|
||||||
|
|
||||||
let meta_rep_param = TableShardedReplication {
|
let meta_rep_param = TableShardedReplication {
|
||||||
system: system.clone(),
|
layout_manager: system.layout_manager.clone(),
|
||||||
replication_factor: replication_factor.into(),
|
|
||||||
write_quorum: replication_factor.write_quorum(consistency_mode),
|
|
||||||
read_quorum: replication_factor.read_quorum(consistency_mode),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let control_rep_param = TableFullReplication {
|
let control_rep_param = TableFullReplication {
|
||||||
|
@ -254,7 +244,6 @@ impl Garage {
|
||||||
config.data_dir.clone(),
|
config.data_dir.clone(),
|
||||||
config.data_fsync,
|
config.data_fsync,
|
||||||
config.compression_level,
|
config.compression_level,
|
||||||
data_rep_param,
|
|
||||||
system.clone(),
|
system.clone(),
|
||||||
)?;
|
)?;
|
||||||
block_manager.register_bg_vars(&mut bg_vars);
|
block_manager.register_bg_vars(&mut bg_vars);
|
||||||
|
|
|
@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::replication_mode::*;
|
use crate::replication_mode::ReplicationFactor;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
|
||||||
pub struct RpcLayoutDigest {
|
pub struct RpcLayoutDigest {
|
||||||
|
@ -30,7 +30,6 @@ pub struct SyncLayoutDigest {
|
||||||
|
|
||||||
pub struct LayoutHelper {
|
pub struct LayoutHelper {
|
||||||
replication_factor: ReplicationFactor,
|
replication_factor: ReplicationFactor,
|
||||||
consistency_mode: ConsistencyMode,
|
|
||||||
layout: Option<LayoutHistory>,
|
layout: Option<LayoutHistory>,
|
||||||
|
|
||||||
// cached values
|
// cached values
|
||||||
|
@ -59,7 +58,6 @@ impl Deref for LayoutHelper {
|
||||||
impl LayoutHelper {
|
impl LayoutHelper {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
replication_factor: ReplicationFactor,
|
replication_factor: ReplicationFactor,
|
||||||
consistency_mode: ConsistencyMode,
|
|
||||||
mut layout: LayoutHistory,
|
mut layout: LayoutHistory,
|
||||||
mut ack_lock: HashMap<u64, AtomicUsize>,
|
mut ack_lock: HashMap<u64, AtomicUsize>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
@ -68,13 +66,6 @@ impl LayoutHelper {
|
||||||
// correct and we have rapid access to important values such as
|
// correct and we have rapid access to important values such as
|
||||||
// the layout versions to use when reading to ensure consistency.
|
// the layout versions to use when reading to ensure consistency.
|
||||||
|
|
||||||
if consistency_mode != ConsistencyMode::Consistent {
|
|
||||||
// Fast path for when no consistency is required.
|
|
||||||
// In this case we only need to keep the last version of the layout,
|
|
||||||
// we don't care about coordinating stuff in the cluster.
|
|
||||||
layout.keep_current_version_only();
|
|
||||||
}
|
|
||||||
|
|
||||||
layout.cleanup_old_versions();
|
layout.cleanup_old_versions();
|
||||||
|
|
||||||
let all_nodes = layout.get_all_nodes();
|
let all_nodes = layout.get_all_nodes();
|
||||||
|
@ -117,7 +108,6 @@ impl LayoutHelper {
|
||||||
|
|
||||||
LayoutHelper {
|
LayoutHelper {
|
||||||
replication_factor,
|
replication_factor,
|
||||||
consistency_mode,
|
|
||||||
layout: Some(layout),
|
layout: Some(layout),
|
||||||
ack_map_min,
|
ack_map_min,
|
||||||
sync_map_min,
|
sync_map_min,
|
||||||
|
@ -143,7 +133,6 @@ impl LayoutHelper {
|
||||||
if changed {
|
if changed {
|
||||||
*self = Self::new(
|
*self = Self::new(
|
||||||
self.replication_factor,
|
self.replication_factor,
|
||||||
self.consistency_mode,
|
|
||||||
self.layout.take().unwrap(),
|
self.layout.take().unwrap(),
|
||||||
std::mem::take(&mut self.ack_lock),
|
std::mem::take(&mut self.ack_lock),
|
||||||
);
|
);
|
||||||
|
|
|
@ -21,6 +21,7 @@ use crate::system::*;
|
||||||
pub struct LayoutManager {
|
pub struct LayoutManager {
|
||||||
node_id: Uuid,
|
node_id: Uuid,
|
||||||
replication_factor: ReplicationFactor,
|
replication_factor: ReplicationFactor,
|
||||||
|
consistency_mode: ConsistencyMode,
|
||||||
persist_cluster_layout: Persister<LayoutHistory>,
|
persist_cluster_layout: Persister<LayoutHistory>,
|
||||||
|
|
||||||
layout: Arc<RwLock<LayoutHelper>>,
|
layout: Arc<RwLock<LayoutHelper>>,
|
||||||
|
@ -64,12 +65,8 @@ impl LayoutManager {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut cluster_layout = LayoutHelper::new(
|
let mut cluster_layout =
|
||||||
replication_factor,
|
LayoutHelper::new(replication_factor, cluster_layout, Default::default());
|
||||||
consistency_mode,
|
|
||||||
cluster_layout,
|
|
||||||
Default::default(),
|
|
||||||
);
|
|
||||||
cluster_layout.update_trackers(node_id.into());
|
cluster_layout.update_trackers(node_id.into());
|
||||||
|
|
||||||
let layout = Arc::new(RwLock::new(cluster_layout));
|
let layout = Arc::new(RwLock::new(cluster_layout));
|
||||||
|
@ -85,6 +82,7 @@ impl LayoutManager {
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
node_id: node_id.into(),
|
node_id: node_id.into(),
|
||||||
replication_factor,
|
replication_factor,
|
||||||
|
consistency_mode,
|
||||||
persist_cluster_layout,
|
persist_cluster_layout,
|
||||||
layout,
|
layout,
|
||||||
change_notify,
|
change_notify,
|
||||||
|
@ -141,6 +139,14 @@ impl LayoutManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn read_quorum(self: &Arc<Self>) -> usize {
|
||||||
|
self.replication_factor.read_quorum(self.consistency_mode)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_quorum(self: &Arc<Self>) -> usize {
|
||||||
|
self.replication_factor.write_quorum(self.consistency_mode)
|
||||||
|
}
|
||||||
|
|
||||||
// ---- ACK LOCKING ----
|
// ---- ACK LOCKING ----
|
||||||
|
|
||||||
pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> {
|
pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> {
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use garage_rpc::layout::manager::LayoutManager;
|
||||||
use garage_rpc::layout::*;
|
use garage_rpc::layout::*;
|
||||||
use garage_rpc::system::System;
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
use crate::replication::*;
|
use crate::replication::*;
|
||||||
|
@ -15,42 +15,36 @@ use crate::replication::*;
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TableShardedReplication {
|
pub struct TableShardedReplication {
|
||||||
/// The membership manager of this node
|
/// The membership manager of this node
|
||||||
pub system: Arc<System>,
|
pub layout_manager: Arc<LayoutManager>,
|
||||||
/// How many time each data should be replicated
|
|
||||||
pub replication_factor: usize,
|
|
||||||
/// How many nodes to contact for a read, should be at most `replication_factor`
|
|
||||||
pub read_quorum: usize,
|
|
||||||
/// How many nodes to contact for a write, should be at most `replication_factor`
|
|
||||||
pub write_quorum: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableReplication for TableShardedReplication {
|
impl TableReplication for TableShardedReplication {
|
||||||
type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
|
type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
|
||||||
|
|
||||||
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
self.system.cluster_layout().storage_nodes_of(hash)
|
self.layout_manager.layout().storage_nodes_of(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
self.system.cluster_layout().read_nodes_of(hash)
|
self.layout_manager.layout().read_nodes_of(hash)
|
||||||
}
|
}
|
||||||
fn read_quorum(&self) -> usize {
|
fn read_quorum(&self) -> usize {
|
||||||
self.read_quorum
|
self.layout_manager.read_quorum()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
|
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
|
||||||
self.system.layout_manager.write_sets_of(hash)
|
self.layout_manager.write_sets_of(hash)
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self) -> usize {
|
||||||
self.write_quorum
|
self.layout_manager.write_quorum()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn partition_of(&self, hash: &Hash) -> Partition {
|
fn partition_of(&self, hash: &Hash) -> Partition {
|
||||||
self.system.cluster_layout().current().partition_of(hash)
|
self.layout_manager.layout().current().partition_of(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sync_partitions(&self) -> SyncPartitions {
|
fn sync_partitions(&self) -> SyncPartitions {
|
||||||
let layout = self.system.cluster_layout();
|
let layout = self.layout_manager.layout();
|
||||||
let layout_version = layout.ack_map_min();
|
let layout_version = layout.ack_map_min();
|
||||||
|
|
||||||
let mut partitions = layout
|
let mut partitions = layout
|
||||||
|
|
|
@ -84,7 +84,7 @@ where
|
||||||
&self.v
|
&self.v
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Take the value inside the CRDT (discards the timesamp)
|
/// Take the value inside the CRDT (discards the timestamp)
|
||||||
pub fn take(self) -> T {
|
pub fn take(self) -> T {
|
||||||
self.v
|
self.v
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue