layout: refactoring of all_nodes
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing

This commit is contained in:
Alex 2023-11-14 13:06:16 +01:00
parent 8e292e06b3
commit 1aab1f4e68
Signed by: lx
GPG key ID: 0E496D15096376BE
7 changed files with 44 additions and 27 deletions

View file

@ -126,8 +126,8 @@ impl AdminRpcHandler {
opt_to_send.all_nodes = false; opt_to_send.all_nodes = false;
let mut failures = vec![]; let mut failures = vec![];
let layout = self.garage.system.cluster_layout().clone(); let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in layout.current().node_ids().iter() { for node in all_nodes.iter() {
let node = (*node).into(); let node = (*node).into();
let resp = self let resp = self
.endpoint .endpoint
@ -163,9 +163,9 @@ impl AdminRpcHandler {
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> { async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes { if opt.all_nodes {
let mut ret = String::new(); let mut ret = String::new();
let layout = self.garage.system.cluster_layout().clone(); let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in layout.current().node_ids().iter() { for node in all_nodes.iter() {
let mut opt = opt.clone(); let mut opt = opt.clone();
opt.all_nodes = false; opt.all_nodes = false;
opt.skip_global = true; opt.skip_global = true;
@ -275,6 +275,7 @@ impl AdminRpcHandler {
let mut ret = String::new(); let mut ret = String::new();
// Gather storage node and free space statistics // Gather storage node and free space statistics
// TODO: not only layout.current() ???
let layout = &self.garage.system.cluster_layout(); let layout = &self.garage.system.cluster_layout();
let mut node_partition_count = HashMap::<Uuid, u64>::new(); let mut node_partition_count = HashMap::<Uuid, u64>::new();
for short_id in layout.current().ring_assignment_data.iter() { for short_id in layout.current().ring_assignment_data.iter() {
@ -440,8 +441,8 @@ impl AdminRpcHandler {
) -> Result<AdminRpc, Error> { ) -> Result<AdminRpc, Error> {
if all_nodes { if all_nodes {
let mut ret = vec![]; let mut ret = vec![];
let layout = self.garage.system.cluster_layout().clone(); let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in layout.current().node_ids().iter() { for node in all_nodes.iter() {
let node = (*node).into(); let node = (*node).into();
match self match self
.endpoint .endpoint
@ -488,8 +489,8 @@ impl AdminRpcHandler {
) -> Result<AdminRpc, Error> { ) -> Result<AdminRpc, Error> {
if all_nodes { if all_nodes {
let mut ret = vec![]; let mut ret = vec![];
let layout = self.garage.system.cluster_layout().clone(); let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in layout.current().node_ids().iter() { for node in all_nodes.iter() {
let node = (*node).into(); let node = (*node).into();
match self match self
.endpoint .endpoint

View file

@ -49,6 +49,7 @@ pub async fn cmd_assign_role(
}; };
let mut layout = fetch_layout(rpc_cli, rpc_host).await?; let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
let all_nodes = layout.all_nodes().into_owned();
let added_nodes = args let added_nodes = args
.node_ids .node_ids
@ -58,7 +59,7 @@ pub async fn cmd_assign_role(
status status
.iter() .iter()
.map(|adv| adv.id) .map(|adv| adv.id)
.chain(layout.current().node_ids().iter().cloned()), .chain(all_nodes.iter().cloned()),
node_id, node_id,
) )
}) })
@ -68,8 +69,7 @@ pub async fn cmd_assign_role(
roles.merge(&layout.staging.get().roles); roles.merge(&layout.staging.get().roles);
for replaced in args.replace.iter() { for replaced in args.replace.iter() {
let replaced_node = let replaced_node = find_matching_node(all_nodes.iter().cloned(), replaced)?;
find_matching_node(layout.current().node_ids().iter().cloned(), replaced)?;
match roles.get(&replaced_node) { match roles.get(&replaced_node) {
Some(NodeRoleV(Some(_))) => { Some(NodeRoleV(Some(_))) => {
layout layout

View file

@ -450,8 +450,12 @@ impl<'a> BucketHelper<'a> {
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
{ {
// TODO: not only current let node_id_vec = self
let node_id_vec = self.0.system.cluster_layout().current().node_ids().to_vec(); .0
.system
.cluster_layout()
.all_nongateway_nodes()
.into_owned();
let k2vindexes = self let k2vindexes = self
.0 .0
.k2v .k2v

View file

@ -60,6 +60,21 @@ impl LayoutHistory {
(self.current().version, self.all_ack(), self.min_stored()) (self.current().version, self.all_ack(), self.min_stored())
} }
pub fn all_nodes(&self) -> Cow<'_, [Uuid]> {
// TODO: cache this
if self.versions.len() == 1 {
self.versions[0].all_nodes().into()
} else {
let set = self
.versions
.iter()
.map(|x| x.all_nodes())
.flatten()
.collect::<HashSet<_>>();
set.into_iter().copied().collect::<Vec<_>>().into()
}
}
pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> { pub fn all_nongateway_nodes(&self) -> Cow<'_, [Uuid]> {
// TODO: cache this // TODO: cache this
if self.versions.len() == 1 { if self.versions.len() == 1 {

View file

@ -38,22 +38,19 @@ impl LayoutVersion {
// ===================== accessors ====================== // ===================== accessors ======================
/// Returns a list of IDs of nodes that currently have /// Returns a list of IDs of nodes that have a role in this
/// a role in the cluster /// version of the cluster layout, including gateway nodes
pub fn node_ids(&self) -> &[Uuid] { pub fn all_nodes(&self) -> &[Uuid] {
&self.node_id_vec[..] &self.node_id_vec[..]
} }
/// Returns the uuids of the non_gateway nodes in self.node_id_vec. /// Returns a list of IDs of nodes that have a storage capacity
/// assigned in this version of the cluster layout
pub fn nongateway_nodes(&self) -> &[Uuid] { pub fn nongateway_nodes(&self) -> &[Uuid] {
&self.node_id_vec[..self.nongateway_node_count] &self.node_id_vec[..self.nongateway_node_count]
} }
pub fn num_nodes(&self) -> usize { /// Returns the role of a node in the layout, if it has one
self.node_id_vec.len()
}
/// Returns the role of a node in the layout
pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> { pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> {
match self.roles.get(node) { match self.roles.get(node) {
Some(NodeRoleV(Some(v))) => Some(v), Some(NodeRoleV(Some(v))) => Some(v),
@ -61,7 +58,7 @@ impl LayoutVersion {
} }
} }
/// Given a node uuids, this function returns its capacity or fails if it does not have any /// Returns the capacity of a node in the layout, if it has one
pub fn get_node_capacity(&self, uuid: &Uuid) -> Option<u64> { pub fn get_node_capacity(&self, uuid: &Uuid) -> Option<u64> {
match self.node_role(uuid) { match self.node_role(uuid) {
Some(NodeRole { Some(NodeRole {

View file

@ -609,7 +609,7 @@ impl System {
while !*stop_signal.borrow() { while !*stop_signal.borrow() {
let not_configured = self.cluster_layout().check().is_err(); let not_configured = self.cluster_layout().check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
let expected_n_nodes = self.cluster_layout().current().num_nodes(); let expected_n_nodes = self.cluster_layout().all_nodes().len();
let bad_peers = self let bad_peers = self
.fullmesh .fullmesh
.get_peer_list() .get_peer_list()

View file

@ -35,10 +35,10 @@ impl TableReplication for TableFullReplication {
} }
fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> { fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
self.system.cluster_layout().current().node_ids().to_vec() self.system.cluster_layout().current().all_nodes().to_vec()
} }
fn write_quorum(&self) -> usize { fn write_quorum(&self) -> usize {
let nmembers = self.system.cluster_layout().current().node_ids().len(); let nmembers = self.system.cluster_layout().current().all_nodes().len();
if nmembers > self.max_faults { if nmembers > self.max_faults {
nmembers - self.max_faults nmembers - self.max_faults
} else { } else {
@ -62,7 +62,7 @@ impl TableReplication for TableFullReplication {
partition: 0u16, partition: 0u16,
first_hash: [0u8; 32].into(), first_hash: [0u8; 32].into(),
last_hash: [0xff; 32].into(), last_hash: [0xff; 32].into(),
storage_nodes: Vec::from_iter(layout.current().node_ids().to_vec()), storage_nodes: Vec::from_iter(layout.current().all_nodes().to_vec()),
}], }],
} }
} }