Properly implement watches for Basalt
This commit is contained in:
parent
bc86bd3986
commit
7eea46dcf3
3 changed files with 20 additions and 15 deletions
|
@ -12,6 +12,8 @@ use structopt::StructOpt;
|
||||||
use sodiumoxide::crypto::auth;
|
use sodiumoxide::crypto::auth;
|
||||||
use sodiumoxide::crypto::sign::ed25519;
|
use sodiumoxide::crypto::sign::ed25519;
|
||||||
|
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use netapp::endpoint::*;
|
use netapp::endpoint::*;
|
||||||
use netapp::peering::basalt::*;
|
use netapp::peering::basalt::*;
|
||||||
use netapp::proto::*;
|
use netapp::proto::*;
|
||||||
|
@ -126,18 +128,18 @@ async fn main() {
|
||||||
let watch_cancel = netapp::util::watch_ctrl_c();
|
let watch_cancel = netapp::util::watch_ctrl_c();
|
||||||
|
|
||||||
tokio::join!(
|
tokio::join!(
|
||||||
example.clone().sampling_loop(),
|
example.clone().sampling_loop(watch_cancel.clone()),
|
||||||
example
|
example
|
||||||
.netapp
|
.netapp
|
||||||
.clone()
|
.clone()
|
||||||
.listen(listen_addr, public_addr, watch_cancel),
|
.listen(listen_addr, public_addr, watch_cancel.clone()),
|
||||||
example.basalt.clone().run(),
|
example.basalt.clone().run(watch_cancel.clone()),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Example {
|
impl Example {
|
||||||
async fn sampling_loop(self: Arc<Self>) {
|
async fn sampling_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
|
||||||
loop {
|
while !*must_exit.borrow() {
|
||||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||||
|
|
||||||
let peers = self.basalt.sample(10);
|
let peers = self.basalt.sample(10);
|
||||||
|
|
|
@ -11,6 +11,8 @@ use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use sodiumoxide::crypto::hash;
|
use sodiumoxide::crypto::hash;
|
||||||
|
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use crate::endpoint::*;
|
use crate::endpoint::*;
|
||||||
use crate::netapp::*;
|
use crate::netapp::*;
|
||||||
use crate::proto::*;
|
use crate::proto::*;
|
||||||
|
@ -303,18 +305,19 @@ impl Basalt {
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(self: Arc<Self>) {
|
pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
|
||||||
for peer in self.bootstrap_peers.iter() {
|
for peer in self.bootstrap_peers.iter() {
|
||||||
tokio::spawn(self.clone().try_connect(*peer));
|
tokio::spawn(self.clone().try_connect(*peer));
|
||||||
}
|
}
|
||||||
|
|
||||||
let pushpull_loop = self.clone().run_pushpull_loop();
|
tokio::join!(
|
||||||
let reset_loop = self.run_reset_loop();
|
self.clone().run_pushpull_loop(must_exit.clone()),
|
||||||
tokio::join!(pushpull_loop, reset_loop);
|
self.clone().run_reset_loop(must_exit.clone()),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_pushpull_loop(self: Arc<Self>) {
|
async fn run_pushpull_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
|
||||||
loop {
|
while !*must_exit.borrow() {
|
||||||
tokio::time::sleep(self.param.exchange_interval).await;
|
tokio::time::sleep(self.param.exchange_interval).await;
|
||||||
|
|
||||||
let peers = self.view.read().unwrap().sample(2);
|
let peers = self.view.read().unwrap().sample(2);
|
||||||
|
@ -360,8 +363,8 @@ impl Basalt {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_reset_loop(self: Arc<Self>) {
|
async fn run_reset_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
|
||||||
loop {
|
while !*must_exit.borrow() {
|
||||||
tokio::time::sleep(self.param.reset_interval).await;
|
tokio::time::sleep(self.param.reset_interval).await;
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
|
@ -178,12 +178,12 @@ impl FullMeshPeeringStrategy {
|
||||||
// 1. Read current state: get list of connected peers (ping them)
|
// 1. Read current state: get list of connected peers (ping them)
|
||||||
let (to_ping, to_retry) = {
|
let (to_ping, to_retry) = {
|
||||||
let known_hosts = self.known_hosts.read().unwrap();
|
let known_hosts = self.known_hosts.read().unwrap();
|
||||||
debug!("known_hosts: {} peers", known_hosts.list.len());
|
trace!("known_hosts: {} peers", known_hosts.list.len());
|
||||||
|
|
||||||
let mut to_ping = vec![];
|
let mut to_ping = vec![];
|
||||||
let mut to_retry = vec![];
|
let mut to_retry = vec![];
|
||||||
for (id, info) in known_hosts.list.iter() {
|
for (id, info) in known_hosts.list.iter() {
|
||||||
debug!("{}, {:?}", hex::encode(id), info);
|
trace!("{}, {:?}", hex::encode(id), info);
|
||||||
match info.state {
|
match info.state {
|
||||||
PeerConnState::Connected => {
|
PeerConnState::Connected => {
|
||||||
let must_ping = match info.last_seen {
|
let must_ping = match info.last_seen {
|
||||||
|
|
Loading…
Reference in a new issue