diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 721b7c396..bcf98357f 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -328,7 +328,7 @@ impl System { let self2 = self.clone(); self.background .spawn_worker(format!("discovery loop"), |stop_signal| { - self2.discovery_loop(peers, stop_signal) + self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal) }); let self2 = self.clone(); @@ -336,14 +336,6 @@ impl System { .spawn_worker(format!("ping loop"), |stop_signal| { self2.ping_loop(stop_signal) }); - - if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) { - let self2 = self.clone(); - self.background - .spawn_worker(format!("Consul loop"), |stop_signal| { - self2.consul_loop(stop_signal, consul_host, consul_service_name) - }); - } } async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { @@ -561,8 +553,15 @@ impl System { async fn discovery_loop( self: Arc, bootstrap_peers: Vec, + consul_host: Option, + consul_service_name: Option, mut stop_signal: watch::Receiver, ) { + let consul_config = match (consul_host, consul_service_name) { + (Some(ch), Some(csn)) => Some((ch, csn)), + _ => None, + }; + while !*stop_signal.borrow() { let not_configured = self.ring.borrow().config.members.len() == 0; let no_peers = self.status.borrow().nodes.len() < 3; @@ -571,50 +570,36 @@ impl System { .borrow() .nodes .iter() - .filter(|(_, v)| !v.is_up()) + .filter(|(_, v)| v.is_up()) .count() != self.ring.borrow().config.members.len(); if not_configured || no_peers || bad_peers { info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); - let mut bp2 = bootstrap_peers + let mut ping_list = bootstrap_peers .iter() .map(|ip| (*ip, None)) .collect::>(); match self.persist_status.load_async().await { Ok(peers) => { - bp2.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); + ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); } _ => (), } - self.clone().ping_nodes(bp2).await; - } - - let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); - select! { - _ = restart_at.fuse() => (), - _ = stop_signal.changed().fuse() => (), - } - } - } - - async fn consul_loop( - self: Arc, - mut stop_signal: watch::Receiver, - consul_host: String, - consul_service_name: String, - ) { - while !*stop_signal.borrow() { - match get_consul_nodes(&consul_host, &consul_service_name).await { - Ok(mut node_list) => { - let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::>(); - self.clone().ping_nodes(ping_addrs).await; - } - Err(e) => { - warn!("Could not retrieve node list from Consul: {}", e); + if let Some((consul_host, consul_service_name)) = &consul_config { + match get_consul_nodes(consul_host, consul_service_name).await { + Ok(node_list) => { + ping_list.extend(node_list.iter().map(|a| (*a, None))); + } + Err(e) => { + warn!("Could not retrieve node list from Consul: {}", e); + } + } } + + self.clone().ping_nodes(ping_list).await; } let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);