Merge discovery loop with consul

This commit is contained in:
Alex 2021-04-05 23:04:08 +02:00
parent 948e44a3f6
commit 8c33d565d6
No known key found for this signature in database
GPG key ID: EDABF9711E244EB1

View file

@ -328,7 +328,7 @@ impl System {
let self2 = self.clone(); let self2 = self.clone();
self.background self.background
.spawn_worker(format!("discovery loop"), |stop_signal| { .spawn_worker(format!("discovery loop"), |stop_signal| {
self2.discovery_loop(peers, stop_signal) self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal)
}); });
let self2 = self.clone(); let self2 = self.clone();
@ -336,14 +336,6 @@ impl System {
.spawn_worker(format!("ping loop"), |stop_signal| { .spawn_worker(format!("ping loop"), |stop_signal| {
self2.ping_loop(stop_signal) self2.ping_loop(stop_signal)
}); });
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
self.background
.spawn_worker(format!("Consul loop"), |stop_signal| {
self2.consul_loop(stop_signal, consul_host, consul_service_name)
});
}
} }
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) { async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
@ -561,8 +553,15 @@ impl System {
async fn discovery_loop( async fn discovery_loop(
self: Arc<Self>, self: Arc<Self>,
bootstrap_peers: Vec<SocketAddr>, bootstrap_peers: Vec<SocketAddr>,
consul_host: Option<String>,
consul_service_name: Option<String>,
mut stop_signal: watch::Receiver<bool>, mut stop_signal: watch::Receiver<bool>,
) { ) {
let consul_config = match (consul_host, consul_service_name) {
(Some(ch), Some(csn)) => Some((ch, csn)),
_ => None,
};
while !*stop_signal.borrow() { while !*stop_signal.borrow() {
let not_configured = self.ring.borrow().config.members.len() == 0; let not_configured = self.ring.borrow().config.members.len() == 0;
let no_peers = self.status.borrow().nodes.len() < 3; let no_peers = self.status.borrow().nodes.len() < 3;
@ -571,50 +570,36 @@ impl System {
.borrow() .borrow()
.nodes .nodes
.iter() .iter()
.filter(|(_, v)| !v.is_up()) .filter(|(_, v)| v.is_up())
.count() != self.ring.borrow().config.members.len(); .count() != self.ring.borrow().config.members.len();
if not_configured || no_peers || bad_peers { if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
let mut bp2 = bootstrap_peers let mut ping_list = bootstrap_peers
.iter() .iter()
.map(|ip| (*ip, None)) .map(|ip| (*ip, None))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
match self.persist_status.load_async().await { match self.persist_status.load_async().await {
Ok(peers) => { Ok(peers) => {
bp2.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
} }
_ => (), _ => (),
} }
self.clone().ping_nodes(bp2).await; if let Some((consul_host, consul_service_name)) = &consul_config {
} match get_consul_nodes(consul_host, consul_service_name).await {
Ok(node_list) => {
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); ping_list.extend(node_list.iter().map(|a| (*a, None)));
select! { }
_ = restart_at.fuse() => (), Err(e) => {
_ = stop_signal.changed().fuse() => (), warn!("Could not retrieve node list from Consul: {}", e);
} }
} }
}
async fn consul_loop(
self: Arc<Self>,
mut stop_signal: watch::Receiver<bool>,
consul_host: String,
consul_service_name: String,
) {
while !*stop_signal.borrow() {
match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => {
let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
self.clone().ping_nodes(ping_addrs).await;
}
Err(e) => {
warn!("Could not retrieve node list from Consul: {}", e);
} }
self.clone().ping_nodes(ping_list).await;
} }
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);