diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 39e29580..938fe954 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -27,26 +27,49 @@ impl TableReplication for TableFullReplication { type WriteSets = Vec>; fn storage_nodes(&self, _hash: &Hash) -> Vec { - let layout = self.system.cluster_layout(); - layout.current().all_nodes().to_vec() + self.system.cluster_layout().all_nodes().to_vec() } fn read_nodes(&self, _hash: &Hash) -> Vec { - vec![self.system.id] + self.system + .cluster_layout() + .read_version() + .all_nodes() + .to_vec() } fn read_quorum(&self) -> usize { - 1 + let layout = self.system.cluster_layout(); + let nodes = layout.read_version().all_nodes(); + nodes.len().div_euclid(2) + 1 } - fn write_sets(&self, hash: &Hash) -> Self::WriteSets { - vec![self.storage_nodes(hash)] + fn write_sets(&self, _hash: &Hash) -> Self::WriteSets { + self.system + .cluster_layout() + .versions() + .iter() + .map(|ver| ver.all_nodes().to_vec()) + .collect() } fn write_quorum(&self) -> usize { - let nmembers = self.system.cluster_layout().current().all_nodes().len(); - if nmembers < 3 { - 1 + let layout = self.system.cluster_layout(); + let min_len = layout + .versions() + .iter() + .map(|x| x.all_nodes().len()) + .min() + .unwrap(); + let max_quorum = layout + .versions() + .iter() + .map(|x| x.all_nodes().len().div_euclid(2) + 1) + .max() + .unwrap(); + if min_len < max_quorum { + warn!("Write quorum will not be respected for TableFullReplication operations due to multiple active layout versions with vastly different number of nodes"); + min_len } else { - nmembers.div_euclid(2) + 1 + max_quorum } } @@ -56,15 +79,18 @@ impl TableReplication for TableFullReplication { fn sync_partitions(&self) -> SyncPartitions { let layout = self.system.cluster_layout(); - let layout_version = layout.current().version; + let layout_version = layout.ack_map_min(); + + let partitions = vec![SyncPartition { + partition: 0u16, + first_hash: [0u8; 32].into(), + last_hash: [0xff; 32].into(), + storage_sets: self.write_sets(&[0u8; 32].into()), + }]; + SyncPartitions { layout_version, - partitions: vec![SyncPartition { - partition: 0u16, - first_hash: [0u8; 32].into(), - last_hash: [0xff; 32].into(), - storage_sets: vec![layout.current().all_nodes().to_vec()], - }], + partitions, } } }