cluster layout: adapt all uses of ClusterLayout to LayoutHistory

This commit is contained in:
Alex 2023-11-08 19:28:36 +01:00
parent fe9af1dcaa
commit 8dccee3ccf
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
12 changed files with 80 additions and 69 deletions

View file

@ -89,8 +89,9 @@ pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<
Ok(json_ok_response(&res)?) Ok(json_ok_response(&res)?)
} }
fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResponse { fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse {
let roles = layout let roles = layout
.current()
.roles .roles
.items() .items()
.iter() .iter()
@ -107,7 +108,7 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp
.staging_roles .staging_roles
.items() .items()
.iter() .iter()
.filter(|(k, _, v)| layout.roles.get(k) != Some(v)) .filter(|(k, _, v)| layout.current().roles.get(k) != Some(v))
.map(|(k, _, v)| match &v.0 { .map(|(k, _, v)| match &v.0 {
None => NodeRoleChange { None => NodeRoleChange {
id: hex::encode(k), id: hex::encode(k),
@ -125,7 +126,7 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp
.collect::<Vec<_>>(); .collect::<Vec<_>>();
GetClusterLayoutResponse { GetClusterLayoutResponse {
version: layout.version, version: layout.current().version,
roles, roles,
staged_role_changes, staged_role_changes,
} }
@ -209,7 +210,7 @@ pub async fn handle_update_cluster_layout(
let mut layout = garage.system.cluster_layout().as_ref().clone(); let mut layout = garage.system.cluster_layout().as_ref().clone();
let mut roles = layout.roles.clone(); let mut roles = layout.current().roles.clone();
roles.merge(&layout.staging_roles); roles.merge(&layout.staging_roles);
for change in updates { for change in updates {

View file

@ -5,7 +5,7 @@ use serde::Serialize;
use garage_util::data::*; use garage_util::data::*;
use garage_rpc::layout::ClusterLayout; use garage_rpc::layout::LayoutHistory;
use garage_table::util::*; use garage_table::util::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
@ -26,7 +26,7 @@ pub async fn handle_read_index(
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let reverse = reverse.unwrap_or(false); let reverse = reverse.unwrap_or(false);
let layout: Arc<ClusterLayout> = garage.system.cluster_layout().clone(); let layout: Arc<LayoutHistory> = garage.system.cluster_layout().clone();
let (partition_keys, more, next_start) = read_range( let (partition_keys, more, next_start) = read_range(
&garage.k2v.counter_table.table, &garage.k2v.counter_table.table,
@ -35,7 +35,10 @@ pub async fn handle_read_index(
&start, &start,
&end, &end,
limit, limit,
Some((DeletedFilter::NotDeleted, layout.node_id_vec.clone())), Some((
DeletedFilter::NotDeleted,
layout.current().node_id_vec.clone(),
)),
EnumerationOrder::from_reverse(reverse), EnumerationOrder::from_reverse(reverse),
) )
.await?; .await?;

View file

@ -127,7 +127,7 @@ impl AdminRpcHandler {
let mut failures = vec![]; let mut failures = vec![];
let layout = self.garage.system.cluster_layout().clone(); let layout = self.garage.system.cluster_layout().clone();
for node in layout.node_ids().iter() { for node in layout.current().node_ids().iter() {
let node = (*node).into(); let node = (*node).into();
let resp = self let resp = self
.endpoint .endpoint
@ -165,7 +165,7 @@ impl AdminRpcHandler {
let mut ret = String::new(); let mut ret = String::new();
let layout = self.garage.system.cluster_layout().clone(); let layout = self.garage.system.cluster_layout().clone();
for node in layout.node_ids().iter() { for node in layout.current().node_ids().iter() {
let mut opt = opt.clone(); let mut opt = opt.clone();
opt.all_nodes = false; opt.all_nodes = false;
opt.skip_global = true; opt.skip_global = true;
@ -277,8 +277,8 @@ impl AdminRpcHandler {
// Gather storage node and free space statistics // Gather storage node and free space statistics
let layout = &self.garage.system.cluster_layout(); let layout = &self.garage.system.cluster_layout();
let mut node_partition_count = HashMap::<Uuid, u64>::new(); let mut node_partition_count = HashMap::<Uuid, u64>::new();
for short_id in layout.ring_assignment_data.iter() { for short_id in layout.current().ring_assignment_data.iter() {
let id = layout.node_id_vec[*short_id as usize]; let id = layout.current().node_id_vec[*short_id as usize];
*node_partition_count.entry(id).or_default() += 1; *node_partition_count.entry(id).or_default() += 1;
} }
let node_info = self let node_info = self
@ -293,7 +293,7 @@ impl AdminRpcHandler {
for (id, parts) in node_partition_count.iter() { for (id, parts) in node_partition_count.iter() {
let info = node_info.get(id); let info = node_info.get(id);
let status = info.map(|x| &x.status); let status = info.map(|x| &x.status);
let role = layout.roles.get(id).and_then(|x| x.0.as_ref()); let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?"); let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
let capacity = role let capacity = role
@ -441,7 +441,7 @@ impl AdminRpcHandler {
if all_nodes { if all_nodes {
let mut ret = vec![]; let mut ret = vec![];
let layout = self.garage.system.cluster_layout().clone(); let layout = self.garage.system.cluster_layout().clone();
for node in layout.node_ids().iter() { for node in layout.current().node_ids().iter() {
let node = (*node).into(); let node = (*node).into();
match self match self
.endpoint .endpoint
@ -489,7 +489,7 @@ impl AdminRpcHandler {
if all_nodes { if all_nodes {
let mut ret = vec![]; let mut ret = vec![];
let layout = self.garage.system.cluster_layout().clone(); let layout = self.garage.system.cluster_layout().clone();
for node in layout.node_ids().iter() { for node in layout.current().node_ids().iter() {
let node = (*node).into(); let node = (*node).into();
match self match self
.endpoint .endpoint

View file

@ -62,7 +62,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
let mut healthy_nodes = let mut healthy_nodes =
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()]; vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) { for adv in status.iter().filter(|adv| adv.is_up) {
match layout.roles.get(&adv.id) { match layout.current().roles.get(&adv.id) {
Some(NodeRoleV(Some(cfg))) => { Some(NodeRoleV(Some(cfg))) => {
let data_avail = match &adv.status.data_disk_avail { let data_avail = match &adv.status.data_disk_avail {
_ if cfg.capacity.is_none() => "N/A".into(), _ if cfg.capacity.is_none() => "N/A".into(),
@ -102,10 +102,15 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
format_table(healthy_nodes); format_table(healthy_nodes);
let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>(); let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
let failure_case_1 = status let failure_case_1 = status.iter().any(|adv| {
.iter() !adv.is_up
.any(|adv| !adv.is_up && matches!(layout.roles.get(&adv.id), Some(NodeRoleV(Some(_))))); && matches!(
layout.current().roles.get(&adv.id),
Some(NodeRoleV(Some(_)))
)
});
let failure_case_2 = layout let failure_case_2 = layout
.current()
.roles .roles
.items() .items()
.iter() .iter()
@ -115,7 +120,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
let mut failed_nodes = let mut failed_nodes =
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()]; vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) { for adv in status.iter().filter(|adv| !adv.is_up) {
if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) { if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) {
let tf = timeago::Formatter::new(); let tf = timeago::Formatter::new();
failed_nodes.push(format!( failed_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
@ -132,7 +137,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
)); ));
} }
} }
for (id, _, role_v) in layout.roles.items().iter() { for (id, _, role_v) in layout.current().roles.items().iter() {
if let NodeRoleV(Some(cfg)) = role_v { if let NodeRoleV(Some(cfg)) = role_v {
if !status_keys.contains(id) { if !status_keys.contains(id) {
failed_nodes.push(format!( failed_nodes.push(format!(

View file

@ -58,17 +58,18 @@ pub async fn cmd_assign_role(
status status
.iter() .iter()
.map(|adv| adv.id) .map(|adv| adv.id)
.chain(layout.node_ids().iter().cloned()), .chain(layout.current().node_ids().iter().cloned()),
node_id, node_id,
) )
}) })
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
let mut roles = layout.roles.clone(); let mut roles = layout.current().roles.clone();
roles.merge(&layout.staging_roles); roles.merge(&layout.staging_roles);
for replaced in args.replace.iter() { for replaced in args.replace.iter() {
let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?; let replaced_node =
find_matching_node(layout.current().node_ids().iter().cloned(), replaced)?;
match roles.get(&replaced_node) { match roles.get(&replaced_node) {
Some(NodeRoleV(Some(_))) => { Some(NodeRoleV(Some(_))) => {
layout layout
@ -149,7 +150,7 @@ pub async fn cmd_remove_role(
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut layout = fetch_layout(rpc_cli, rpc_host).await?; let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
let mut roles = layout.roles.clone(); let mut roles = layout.current().roles.clone();
roles.merge(&layout.staging_roles); roles.merge(&layout.staging_roles);
let deleted_node = let deleted_node =
@ -174,13 +175,16 @@ pub async fn cmd_show_layout(
let layout = fetch_layout(rpc_cli, rpc_host).await?; let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== CURRENT CLUSTER LAYOUT ===="); println!("==== CURRENT CLUSTER LAYOUT ====");
print_cluster_layout(&layout, "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes."); print_cluster_layout(layout.current(), "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes.");
println!(); println!();
println!("Current cluster layout version: {}", layout.version); println!(
"Current cluster layout version: {}",
layout.current().version
);
let has_role_changes = print_staging_role_changes(&layout); let has_role_changes = print_staging_role_changes(&layout);
if has_role_changes { if has_role_changes {
let v = layout.version; let v = layout.current().version;
let res_apply = layout.apply_staged_changes(Some(v + 1)); let res_apply = layout.apply_staged_changes(Some(v + 1));
// this will print the stats of what partitions // this will print the stats of what partitions
@ -189,7 +193,7 @@ pub async fn cmd_show_layout(
Ok((layout, msg)) => { Ok((layout, msg)) => {
println!(); println!();
println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ===="); println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
print_cluster_layout(&layout, "No nodes have a role in the new layout."); print_cluster_layout(layout.current(), "No nodes have a role in the new layout.");
println!(); println!();
for line in msg.iter() { for line in msg.iter() {
@ -266,11 +270,11 @@ pub async fn cmd_config_layout(
.parse::<ZoneRedundancy>() .parse::<ZoneRedundancy>()
.ok_or_message("invalid zone redundancy value")?; .ok_or_message("invalid zone redundancy value")?;
if let ZoneRedundancy::AtLeast(r_int) = r { if let ZoneRedundancy::AtLeast(r_int) = r {
if r_int > layout.replication_factor { if r_int > layout.current().replication_factor {
return Err(Error::Message(format!( return Err(Error::Message(format!(
"The zone redundancy must be smaller or equal to the \ "The zone redundancy must be smaller or equal to the \
replication factor ({}).", replication factor ({}).",
layout.replication_factor layout.current().replication_factor
))); )));
} else if r_int < 1 { } else if r_int < 1 {
return Err(Error::Message( return Err(Error::Message(
@ -302,7 +306,7 @@ pub async fn cmd_config_layout(
pub async fn fetch_layout( pub async fn fetch_layout(
rpc_cli: &Endpoint<SystemRpc, ()>, rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID, rpc_host: NodeID,
) -> Result<ClusterLayout, Error> { ) -> Result<LayoutHistory, Error> {
match rpc_cli match rpc_cli
.call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL) .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await?? .await??
@ -315,7 +319,7 @@ pub async fn fetch_layout(
pub async fn send_layout( pub async fn send_layout(
rpc_cli: &Endpoint<SystemRpc, ()>, rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID, rpc_host: NodeID,
layout: ClusterLayout, layout: LayoutHistory,
) -> Result<(), Error> { ) -> Result<(), Error> {
rpc_cli rpc_cli
.call( .call(
@ -327,7 +331,7 @@ pub async fn send_layout(
Ok(()) Ok(())
} }
pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) { pub fn print_cluster_layout(layout: &LayoutVersion, empty_msg: &str) {
let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()]; let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()];
for (id, _, role) in layout.roles.items().iter() { for (id, _, role) in layout.roles.items().iter() {
let role = match &role.0 { let role = match &role.0 {
@ -366,13 +370,13 @@ pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) {
} }
} }
pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool {
let has_role_changes = layout let has_role_changes = layout
.staging_roles .staging_roles
.items() .items()
.iter() .iter()
.any(|(k, _, v)| layout.roles.get(k) != Some(v)); .any(|(k, _, v)| layout.current().roles.get(k) != Some(v));
let has_layout_changes = *layout.staging_parameters.get() != layout.parameters; let has_layout_changes = *layout.staging_parameters.get() != layout.current().parameters;
if has_role_changes || has_layout_changes { if has_role_changes || has_layout_changes {
println!(); println!();
@ -380,7 +384,7 @@ pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
if has_role_changes { if has_role_changes {
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
for (id, _, role) in layout.staging_roles.items().iter() { for (id, _, role) in layout.staging_roles.items().iter() {
if layout.roles.get(id) == Some(role) { if layout.current().roles.get(id) == Some(role) {
continue; continue;
} }
if let Some(role) = &role.0 { if let Some(role) = &role.0 {

View file

@ -450,10 +450,10 @@ impl<'a> BucketHelper<'a> {
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
{ {
use garage_rpc::layout::ClusterLayout; use garage_rpc::layout::LayoutHistory;
use std::sync::Arc; use std::sync::Arc;
let layout: Arc<ClusterLayout> = self.0.system.cluster_layout().clone(); let layout: Arc<LayoutHistory> = self.0.system.cluster_layout().clone();
let k2vindexes = self let k2vindexes = self
.0 .0
.k2v .k2v
@ -462,7 +462,10 @@ impl<'a> BucketHelper<'a> {
.get_range( .get_range(
&bucket_id, &bucket_id,
None, None,
Some((DeletedFilter::NotDeleted, layout.node_id_vec.clone())), Some((
DeletedFilter::NotDeleted,
layout.current().node_id_vec.clone(),
)),
10, 10,
EnumerationOrder::Forward, EnumerationOrder::Forward,
) )

View file

@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use garage_db as db; use garage_db as db;
use garage_rpc::layout::ClusterLayout; use garage_rpc::layout::LayoutHistory;
use garage_rpc::system::System; use garage_rpc::system::System;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
@ -83,8 +83,8 @@ impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
} }
impl<T: CountedItem> CounterEntry<T> { impl<T: CountedItem> CounterEntry<T> {
pub fn filtered_values(&self, layout: &ClusterLayout) -> HashMap<String, i64> { pub fn filtered_values(&self, layout: &LayoutHistory) -> HashMap<String, i64> {
let nodes = &layout.node_id_vec[..]; let nodes = &layout.current().node_id_vec[..];
self.filtered_values_with_nodes(nodes) self.filtered_values_with_nodes(nodes)
} }

View file

@ -1,5 +1,4 @@
use std::cmp::Ordering; use std::cmp::Ordering;
use std::sync::Arc;
use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*; use garage_util::data::*;
@ -64,24 +63,22 @@ impl LayoutHistory {
} }
// Add any new versions to history // Add any new versions to history
let mut versions = self.versions.to_vec();
for v2 in other.versions.iter() { for v2 in other.versions.iter() {
if let Some(v1) = versions.iter().find(|v| v.version == v2.version) { if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) {
if v1 != v2 { if v1 != v2 {
error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version); error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version);
} }
} else if versions.iter().all(|v| v.version != v2.version - 1) { } else if self.versions.iter().all(|v| v.version != v2.version - 1) {
error!( error!(
"Cannot receive new layout version {}, version {} is missing", "Cannot receive new layout version {}, version {} is missing",
v2.version, v2.version,
v2.version - 1 v2.version - 1
); );
} else { } else {
versions.push(v2.clone()); self.versions.push(v2.clone());
changed = true; changed = true;
} }
} }
self.versions = Arc::from(versions.into_boxed_slice());
// Merge trackers // Merge trackers
self.update_trackers.merge(&other.update_trackers); self.update_trackers.merge(&other.update_trackers);
@ -117,9 +114,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
let msg = new_version.calculate_partition_assignment()?; let msg = new_version.calculate_partition_assignment()?;
let mut versions = self.versions.to_vec(); self.versions.push(new_version);
versions.push(new_version);
self.versions = Arc::from(versions.into_boxed_slice());
Ok((self, msg)) Ok((self, msg))
} }
@ -149,9 +144,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
let mut new_version = self.current().clone(); let mut new_version = self.current().clone();
new_version.version += 1; new_version.version += 1;
let mut versions = self.versions.to_vec(); self.versions.push(new_version);
versions.push(new_version);
self.versions = Arc::from(versions.into_boxed_slice());
Ok(self) Ok(self)
} }

View file

@ -184,7 +184,6 @@ mod v010 {
use garage_util::data::{Hash, Uuid}; use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc;
pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};
/// The layout of the cluster, i.e. the list of roles /// The layout of the cluster, i.e. the list of roles
@ -215,7 +214,7 @@ mod v010 {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LayoutHistory { pub struct LayoutHistory {
/// The versions currently in use in the cluster /// The versions currently in use in the cluster
pub versions: Arc<[LayoutVersion]>, pub versions: Vec<LayoutVersion>,
/// Update trackers /// Update trackers
pub update_trackers: UpdateTrackers, pub update_trackers: UpdateTrackers,
@ -267,7 +266,7 @@ mod v010 {
.collect::<HashMap<Uuid, u64>>(), .collect::<HashMap<Uuid, u64>>(),
); );
let mut ret = Self { let mut ret = Self {
versions: Arc::from(vec![version].into_boxed_slice()), versions: vec![version],
update_trackers: UpdateTrackers { update_trackers: UpdateTrackers {
ack_map: update_tracker.clone(), ack_map: update_tracker.clone(),
sync_map: update_tracker.clone(), sync_map: update_tracker.clone(),

View file

@ -27,11 +27,10 @@ impl TableReplication for TableFullReplication {
} }
fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> { fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
let layout = self.system.cluster_layout(); self.system.cluster_layout().current().node_ids().to_vec()
layout.node_ids().to_vec()
} }
fn write_quorum(&self) -> usize { fn write_quorum(&self) -> usize {
let nmembers = self.system.cluster_layout().node_ids().len(); let nmembers = self.system.cluster_layout().current().node_ids().len();
if nmembers > self.max_faults { if nmembers > self.max_faults {
nmembers - self.max_faults nmembers - self.max_faults
} else { } else {

View file

@ -26,16 +26,20 @@ pub struct TableShardedReplication {
impl TableReplication for TableShardedReplication { impl TableReplication for TableShardedReplication {
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> { fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
let layout = self.system.cluster_layout(); self.system
layout.nodes_of(hash, self.replication_factor) .cluster_layout()
.current()
.nodes_of(hash, self.replication_factor)
} }
fn read_quorum(&self) -> usize { fn read_quorum(&self) -> usize {
self.read_quorum self.read_quorum
} }
fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> { fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
let layout = self.system.cluster_layout(); self.system
layout.nodes_of(hash, self.replication_factor) .cluster_layout()
.current()
.nodes_of(hash, self.replication_factor)
} }
fn write_quorum(&self) -> usize { fn write_quorum(&self) -> usize {
self.write_quorum self.write_quorum
@ -45,9 +49,9 @@ impl TableReplication for TableShardedReplication {
} }
fn partition_of(&self, hash: &Hash) -> Partition { fn partition_of(&self, hash: &Hash) -> Partition {
self.system.cluster_layout().partition_of(hash) self.system.cluster_layout().current().partition_of(hash)
} }
fn partitions(&self) -> Vec<(Partition, Hash)> { fn partitions(&self) -> Vec<(Partition, Hash)> {
self.system.cluster_layout().partitions() self.system.cluster_layout().current().partitions()
} }
} }

View file

@ -492,8 +492,8 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync
struct SyncWorker<F: TableSchema, R: TableReplication> { struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>, syncer: Arc<TableSyncer<F, R>>,
layout_watch: watch::Receiver<Arc<ClusterLayout>>, layout_watch: watch::Receiver<Arc<LayoutHistory>>,
layout: Arc<ClusterLayout>, layout: Arc<LayoutHistory>,
add_full_sync_rx: mpsc::UnboundedReceiver<()>, add_full_sync_rx: mpsc::UnboundedReceiver<()>,
todo: Vec<TodoPartition>, todo: Vec<TodoPartition>,
next_full_sync: Instant, next_full_sync: Instant,