Merge discovery loop with consul
This commit is contained in:
parent
7fd1f9a869
commit
7b85056942
1 changed files with 22 additions and 37 deletions
|
@ -328,7 +328,7 @@ impl System {
|
||||||
let self2 = self.clone();
|
let self2 = self.clone();
|
||||||
self.background
|
self.background
|
||||||
.spawn_worker(format!("discovery loop"), |stop_signal| {
|
.spawn_worker(format!("discovery loop"), |stop_signal| {
|
||||||
self2.discovery_loop(peers, stop_signal)
|
self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal)
|
||||||
});
|
});
|
||||||
|
|
||||||
let self2 = self.clone();
|
let self2 = self.clone();
|
||||||
|
@ -336,14 +336,6 @@ impl System {
|
||||||
.spawn_worker(format!("ping loop"), |stop_signal| {
|
.spawn_worker(format!("ping loop"), |stop_signal| {
|
||||||
self2.ping_loop(stop_signal)
|
self2.ping_loop(stop_signal)
|
||||||
});
|
});
|
||||||
|
|
||||||
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
|
|
||||||
let self2 = self.clone();
|
|
||||||
self.background
|
|
||||||
.spawn_worker(format!("Consul loop"), |stop_signal| {
|
|
||||||
self2.consul_loop(stop_signal, consul_host, consul_service_name)
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
|
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
|
||||||
|
@ -561,8 +553,15 @@ impl System {
|
||||||
async fn discovery_loop(
|
async fn discovery_loop(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
bootstrap_peers: Vec<SocketAddr>,
|
bootstrap_peers: Vec<SocketAddr>,
|
||||||
|
consul_host: Option<String>,
|
||||||
|
consul_service_name: Option<String>,
|
||||||
mut stop_signal: watch::Receiver<bool>,
|
mut stop_signal: watch::Receiver<bool>,
|
||||||
) {
|
) {
|
||||||
|
let consul_config = match (consul_host, consul_service_name) {
|
||||||
|
(Some(ch), Some(csn)) => Some((ch, csn)),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
while !*stop_signal.borrow() {
|
while !*stop_signal.borrow() {
|
||||||
let not_configured = self.ring.borrow().config.members.len() == 0;
|
let not_configured = self.ring.borrow().config.members.len() == 0;
|
||||||
let no_peers = self.status.borrow().nodes.len() < 3;
|
let no_peers = self.status.borrow().nodes.len() < 3;
|
||||||
|
@ -571,50 +570,36 @@ impl System {
|
||||||
.borrow()
|
.borrow()
|
||||||
.nodes
|
.nodes
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(_, v)| !v.is_up())
|
.filter(|(_, v)| v.is_up())
|
||||||
.count() != self.ring.borrow().config.members.len();
|
.count() != self.ring.borrow().config.members.len();
|
||||||
|
|
||||||
if not_configured || no_peers || bad_peers {
|
if not_configured || no_peers || bad_peers {
|
||||||
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
|
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
|
||||||
|
|
||||||
let mut bp2 = bootstrap_peers
|
let mut ping_list = bootstrap_peers
|
||||||
.iter()
|
.iter()
|
||||||
.map(|ip| (*ip, None))
|
.map(|ip| (*ip, None))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
match self.persist_status.load_async().await {
|
match self.persist_status.load_async().await {
|
||||||
Ok(peers) => {
|
Ok(peers) => {
|
||||||
bp2.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
|
ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
|
||||||
self.clone().ping_nodes(bp2).await;
|
if let Some((consul_host, consul_service_name)) = &consul_config {
|
||||||
}
|
match get_consul_nodes(consul_host, consul_service_name).await {
|
||||||
|
Ok(node_list) => {
|
||||||
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
|
ping_list.extend(node_list.iter().map(|a| (*a, None)));
|
||||||
select! {
|
}
|
||||||
_ = restart_at.fuse() => (),
|
Err(e) => {
|
||||||
_ = stop_signal.changed().fuse() => (),
|
warn!("Could not retrieve node list from Consul: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
async fn consul_loop(
|
|
||||||
self: Arc<Self>,
|
|
||||||
mut stop_signal: watch::Receiver<bool>,
|
|
||||||
consul_host: String,
|
|
||||||
consul_service_name: String,
|
|
||||||
) {
|
|
||||||
while !*stop_signal.borrow() {
|
|
||||||
match get_consul_nodes(&consul_host, &consul_service_name).await {
|
|
||||||
Ok(mut node_list) => {
|
|
||||||
let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
|
|
||||||
self.clone().ping_nodes(ping_addrs).await;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Could not retrieve node list from Consul: {}", e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.clone().ping_nodes(ping_list).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
|
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
|
||||||
|
|
Loading…
Reference in a new issue