forked from lx/netapp
We don't want to publish this
This commit is contained in:
parent
dbb15807b9
commit
3aa46dfe95
6 changed files with 0 additions and 656 deletions
|
@ -1,2 +0,0 @@
|
||||||
target
|
|
||||||
!target/release/examples/basalt
|
|
|
@ -1,8 +0,0 @@
|
||||||
FROM archlinux:latest
|
|
||||||
|
|
||||||
COPY target/release/examples/basalt /root/basalt
|
|
||||||
|
|
||||||
ENV RUST_BACKTRACE=1
|
|
||||||
ENV RUST_LOG=netapp=info,netapp::peering=debug,basalt=info
|
|
||||||
|
|
||||||
CMD /root/basalt
|
|
5
Makefile
5
Makefile
|
@ -1,10 +1,5 @@
|
||||||
all:
|
all:
|
||||||
cargo build
|
cargo build
|
||||||
cargo build --example fullmesh
|
cargo build --example fullmesh
|
||||||
cargo build --example basalt
|
|
||||||
#RUST_LOG=netapp=debug cargo run --example fullmesh -- -n 3242ce79e05e8b6a0e43441fbd140a906e13f335f298ae3a52f29784abbab500 -p 6c304114a0e1018bbe60502a34d33f4f439f370856c3333dda2726da01eb93a4894b7ef7249a71f11d342b69702f1beb7c93ec95fbcf122ad1eca583bb0629e7
|
#RUST_LOG=netapp=debug cargo run --example fullmesh -- -n 3242ce79e05e8b6a0e43441fbd140a906e13f335f298ae3a52f29784abbab500 -p 6c304114a0e1018bbe60502a34d33f4f439f370856c3333dda2726da01eb93a4894b7ef7249a71f11d342b69702f1beb7c93ec95fbcf122ad1eca583bb0629e7
|
||||||
|
|
||||||
docker_basalt:
|
|
||||||
cargo build --release --example basalt
|
|
||||||
docker build -t lxpz/basalt_netapp_example:$(TAG) .
|
|
||||||
docker push lxpz/basalt_netapp_example:$(TAG)
|
|
||||||
|
|
|
@ -1,164 +0,0 @@
|
||||||
use std::io::Write;
|
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use log::{debug, info, warn};
|
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use structopt::StructOpt;
|
|
||||||
|
|
||||||
use sodiumoxide::crypto::auth;
|
|
||||||
use sodiumoxide::crypto::sign::ed25519;
|
|
||||||
|
|
||||||
use netapp::message::*;
|
|
||||||
use netapp::peering::basalt::*;
|
|
||||||
use netapp::proto::*;
|
|
||||||
use netapp::NetApp;
|
|
||||||
|
|
||||||
#[derive(StructOpt, Debug)]
|
|
||||||
#[structopt(name = "netapp")]
|
|
||||||
pub struct Opt {
|
|
||||||
#[structopt(long = "network-key", short = "n")]
|
|
||||||
network_key: Option<String>,
|
|
||||||
|
|
||||||
#[structopt(long = "private-key", short = "p")]
|
|
||||||
private_key: Option<String>,
|
|
||||||
|
|
||||||
#[structopt(long = "bootstrap-peer", short = "b")]
|
|
||||||
bootstrap_peers: Vec<String>,
|
|
||||||
|
|
||||||
#[structopt(long = "listen-addr", short = "l", default_value = "127.0.0.1:1980")]
|
|
||||||
listen_addr: String,
|
|
||||||
|
|
||||||
#[structopt(long = "public-addr", short = "a")]
|
|
||||||
public_addr: Option<String>,
|
|
||||||
|
|
||||||
#[structopt(long = "view-size", short = "v", default_value = "100")]
|
|
||||||
view_size: usize,
|
|
||||||
|
|
||||||
#[structopt(long = "cache-size", short = "c", default_value = "1000")]
|
|
||||||
cache_size: usize,
|
|
||||||
|
|
||||||
#[structopt(long = "exchange-interval-secs", short = "x", default_value = "1")]
|
|
||||||
exchange_interval: u64,
|
|
||||||
|
|
||||||
#[structopt(long = "reset-interval-secs", short = "r", default_value = "10")]
|
|
||||||
reset_interval: u64,
|
|
||||||
|
|
||||||
#[structopt(long = "reset-count", short = "k", default_value = "20")]
|
|
||||||
reset_count: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() {
|
|
||||||
env_logger::Builder::new()
|
|
||||||
.parse_env("RUST_LOG")
|
|
||||||
.format(|buf, record| {
|
|
||||||
writeln!(
|
|
||||||
buf,
|
|
||||||
"{} {} {} {}",
|
|
||||||
chrono::Local::now().format("%s%.6f"),
|
|
||||||
record.module_path().unwrap_or("_"),
|
|
||||||
record.level(),
|
|
||||||
record.args()
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.init();
|
|
||||||
|
|
||||||
let opt = Opt::from_args();
|
|
||||||
|
|
||||||
let netid = match &opt.network_key {
|
|
||||||
Some(k) => auth::Key::from_slice(&hex::decode(k).unwrap()).unwrap(),
|
|
||||||
None => auth::gen_key(),
|
|
||||||
};
|
|
||||||
info!("KYEV NK {}", hex::encode(&netid));
|
|
||||||
|
|
||||||
let privkey = match &opt.private_key {
|
|
||||||
Some(k) => ed25519::SecretKey::from_slice(&hex::decode(k).unwrap()).unwrap(),
|
|
||||||
None => {
|
|
||||||
let (_pk, sk) = ed25519::gen_keypair();
|
|
||||||
sk
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
info!("KYEV SK {}", hex::encode(&privkey));
|
|
||||||
info!("KYEV PK {}", hex::encode(&privkey.public_key()));
|
|
||||||
|
|
||||||
let listen_addr = opt.listen_addr.parse().unwrap();
|
|
||||||
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
|
|
||||||
let netapp = NetApp::new(listen_addr, public_addr, netid, privkey);
|
|
||||||
|
|
||||||
let mut bootstrap_peers = vec![];
|
|
||||||
for peer in opt.bootstrap_peers.iter() {
|
|
||||||
if let Some(delim) = peer.find('@') {
|
|
||||||
let (key, ip) = peer.split_at(delim);
|
|
||||||
let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap();
|
|
||||||
let ip = ip[1..].parse::<SocketAddr>().unwrap();
|
|
||||||
bootstrap_peers.push((pubkey, ip));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let basalt_params = BasaltParams {
|
|
||||||
view_size: opt.view_size,
|
|
||||||
cache_size: opt.cache_size,
|
|
||||||
exchange_interval: Duration::from_secs(opt.exchange_interval),
|
|
||||||
reset_interval: Duration::from_secs(opt.reset_interval),
|
|
||||||
reset_count: opt.reset_count,
|
|
||||||
};
|
|
||||||
let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params);
|
|
||||||
|
|
||||||
netapp.add_msg_handler::<ExampleMessage, _, _>(
|
|
||||||
|_from: ed25519::PublicKey, msg: ExampleMessage| {
|
|
||||||
debug!("Got example message: {:?}, sending example response", msg);
|
|
||||||
async {
|
|
||||||
ExampleResponse {
|
|
||||||
example_field: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
tokio::join!(
|
|
||||||
sampling_loop(netapp.clone(), peering.clone()),
|
|
||||||
netapp.listen(),
|
|
||||||
peering.run(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn sampling_loop(netapp: Arc<NetApp>, basalt: Arc<Basalt>) {
|
|
||||||
loop {
|
|
||||||
tokio::time::delay_for(Duration::from_secs(10)).await;
|
|
||||||
|
|
||||||
let peers = basalt.sample(10);
|
|
||||||
for p in peers {
|
|
||||||
debug!("kyev S {}", hex::encode(p));
|
|
||||||
|
|
||||||
let netapp2 = netapp.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
match netapp2
|
|
||||||
.request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(resp) => debug!("Got example response: {:?}", resp),
|
|
||||||
Err(e) => warn!("Error with example request: {}", e),
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
|
||||||
struct ExampleMessage {
|
|
||||||
example_field: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
|
||||||
struct ExampleResponse {
|
|
||||||
example_field: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Message for ExampleMessage {
|
|
||||||
const KIND: MessageKind = 0x99000001;
|
|
||||||
type Response = ExampleResponse;
|
|
||||||
}
|
|
|
@ -1,476 +0,0 @@
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::sync::{Arc, RwLock};
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use log::{debug, info, trace, warn};
|
|
||||||
use lru::LruCache;
|
|
||||||
use rand::{thread_rng, Rng};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
use sodiumoxide::crypto::hash;
|
|
||||||
use sodiumoxide::crypto::sign::ed25519;
|
|
||||||
|
|
||||||
use crate::message::*;
|
|
||||||
use crate::netapp::*;
|
|
||||||
use crate::proto::*;
|
|
||||||
|
|
||||||
// -- Protocol messages --
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct PullMessage {}
|
|
||||||
|
|
||||||
impl Message for PullMessage {
|
|
||||||
const KIND: MessageKind = 0x42001100;
|
|
||||||
type Response = PushMessage;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct PushMessage {
|
|
||||||
peers: Vec<Peer>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Message for PushMessage {
|
|
||||||
const KIND: MessageKind = 0x42001101;
|
|
||||||
type Response = ();
|
|
||||||
}
|
|
||||||
|
|
||||||
// -- Algorithm data structures --
|
|
||||||
|
|
||||||
type Seed = [u8; 32];
|
|
||||||
|
|
||||||
#[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
|
|
||||||
struct Peer {
|
|
||||||
id: ed25519::PublicKey,
|
|
||||||
addr: SocketAddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
type Cost = [u8; 40];
|
|
||||||
const MAX_COST: Cost = [0xffu8; 40];
|
|
||||||
|
|
||||||
impl Peer {
|
|
||||||
fn cost(&self, seed: &Seed) -> Cost {
|
|
||||||
let mut hasher = hash::State::new();
|
|
||||||
hasher.update(&seed[..]);
|
|
||||||
|
|
||||||
let mut cost = [0u8; 40];
|
|
||||||
match self.addr {
|
|
||||||
SocketAddr::V4(v4addr) => {
|
|
||||||
let v4ip = v4addr.ip().octets();
|
|
||||||
|
|
||||||
for i in 0..4 {
|
|
||||||
let mut h = hasher.clone();
|
|
||||||
h.update(&v4ip[..i + 1]);
|
|
||||||
cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SocketAddr::V6(v6addr) => {
|
|
||||||
let v6ip = v6addr.ip().octets();
|
|
||||||
|
|
||||||
for i in 0..4 {
|
|
||||||
let mut h = hasher.clone();
|
|
||||||
h.update(&v6ip[..i + 2]);
|
|
||||||
cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut h5 = hasher.clone();
|
|
||||||
h5.update(&format!("{} {}", self.addr, hex::encode(self.id)).into_bytes()[..]);
|
|
||||||
cost[32..40].copy_from_slice(&h5.finalize()[..8]);
|
|
||||||
}
|
|
||||||
|
|
||||||
cost
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct BasaltSlot {
|
|
||||||
seed: Seed,
|
|
||||||
peer: Option<Peer>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BasaltSlot {
|
|
||||||
fn cost(&self) -> Cost {
|
|
||||||
self.peer.map(|p| p.cost(&self.seed)).unwrap_or(MAX_COST)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct BasaltView {
|
|
||||||
i_reset: usize,
|
|
||||||
slots: Vec<BasaltSlot>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BasaltView {
|
|
||||||
fn new(size: usize) -> Self {
|
|
||||||
let slots = (0..size)
|
|
||||||
.map(|_| BasaltSlot {
|
|
||||||
seed: rand_seed(),
|
|
||||||
peer: None,
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
Self { i_reset: 0, slots }
|
|
||||||
}
|
|
||||||
|
|
||||||
fn current_peers(&self) -> HashSet<Peer> {
|
|
||||||
self.slots
|
|
||||||
.iter()
|
|
||||||
.filter(|s| s.peer.is_some())
|
|
||||||
.map(|s| s.peer.unwrap().clone())
|
|
||||||
.collect::<HashSet<_>>()
|
|
||||||
}
|
|
||||||
fn current_peers_vec(&self) -> Vec<Peer> {
|
|
||||||
self.current_peers().drain().collect::<Vec<_>>()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn sample(&self, count: usize) -> Vec<Peer> {
|
|
||||||
let possibles = self
|
|
||||||
.slots
|
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.filter(|(_i, s)| s.peer.is_some())
|
|
||||||
.map(|(i, _s)| i)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
if possibles.len() == 0 {
|
|
||||||
vec![]
|
|
||||||
} else {
|
|
||||||
let mut ret = vec![];
|
|
||||||
let mut rng = thread_rng();
|
|
||||||
for _i in 0..count {
|
|
||||||
let idx = rng.gen_range(0, possibles.len());
|
|
||||||
ret.push(self.slots[possibles[idx]].peer.unwrap());
|
|
||||||
}
|
|
||||||
ret
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn update_slot(&mut self, i: usize, peers: &[Peer]) {
|
|
||||||
let mut slot_cost = self.slots[i].cost();
|
|
||||||
|
|
||||||
for peer in peers.iter() {
|
|
||||||
let peer_cost = peer.cost(&self.slots[i].seed);
|
|
||||||
if self.slots[i].peer.is_none() || peer_cost < slot_cost {
|
|
||||||
trace!("Best match for slot {}: {}@{} (cost {})", i, hex::encode(peer.id), peer.addr, hex::encode(peer_cost));
|
|
||||||
self.slots[i].peer = Some(*peer);
|
|
||||||
slot_cost = peer_cost;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fn update_all_slots(&mut self, peers: &[Peer]) {
|
|
||||||
for i in 0..self.slots.len() {
|
|
||||||
self.update_slot(i, peers);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn disconnected(&mut self, id: ed25519::PublicKey) {
|
|
||||||
let mut cleared_slots = vec![];
|
|
||||||
for i in 0..self.slots.len() {
|
|
||||||
if let Some(p) = self.slots[i].peer {
|
|
||||||
if p.id == id {
|
|
||||||
self.slots[i].peer = None;
|
|
||||||
cleared_slots.push(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let remaining_peers = self.current_peers_vec();
|
|
||||||
|
|
||||||
for i in cleared_slots {
|
|
||||||
self.update_slot(i, &remaining_peers[..]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn should_try_list(&self, peers: &[Peer]) -> Vec<Peer> {
|
|
||||||
// Select peers that have lower cost than any of our slots
|
|
||||||
let mut ret = HashSet::new();
|
|
||||||
|
|
||||||
for i in 0..self.slots.len() {
|
|
||||||
if self.slots[i].peer.is_none() {
|
|
||||||
return peers.to_vec();
|
|
||||||
}
|
|
||||||
let mut min_cost = self.slots[i].cost();
|
|
||||||
let mut min_peer = None;
|
|
||||||
for peer in peers.iter() {
|
|
||||||
if ret.contains(peer) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let peer_cost = peer.cost(&self.slots[i].seed);
|
|
||||||
if peer_cost < min_cost {
|
|
||||||
min_cost = peer_cost;
|
|
||||||
min_peer = Some(*peer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(p) = min_peer {
|
|
||||||
ret.insert(p);
|
|
||||||
if ret.len() == peers.len() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ret.drain().collect::<Vec<_>>()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn reset_some_slots(&mut self, count: usize) {
|
|
||||||
for _i in 0..count {
|
|
||||||
trace!("Reset slot {}", self.i_reset);
|
|
||||||
self.slots[self.i_reset].seed = rand_seed();
|
|
||||||
self.i_reset = (self.i_reset + 1) % self.slots.len();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct BasaltParams {
|
|
||||||
pub view_size: usize,
|
|
||||||
pub cache_size: usize,
|
|
||||||
pub exchange_interval: Duration,
|
|
||||||
pub reset_interval: Duration,
|
|
||||||
pub reset_count: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Basalt {
|
|
||||||
netapp: Arc<NetApp>,
|
|
||||||
|
|
||||||
param: BasaltParams,
|
|
||||||
bootstrap_peers: Vec<Peer>,
|
|
||||||
|
|
||||||
view: RwLock<BasaltView>,
|
|
||||||
current_attempts: RwLock<HashSet<Peer>>,
|
|
||||||
backlog: RwLock<LruCache<Peer, ()>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Basalt {
|
|
||||||
pub fn new(
|
|
||||||
netapp: Arc<NetApp>,
|
|
||||||
bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>,
|
|
||||||
param: BasaltParams,
|
|
||||||
) -> Arc<Self> {
|
|
||||||
let bootstrap_peers = bootstrap_list
|
|
||||||
.iter()
|
|
||||||
.map(|(id, addr)| Peer {
|
|
||||||
id: *id,
|
|
||||||
addr: *addr,
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let view = BasaltView::new(param.view_size);
|
|
||||||
let backlog = LruCache::new(param.cache_size);
|
|
||||||
|
|
||||||
let basalt = Arc::new(Self {
|
|
||||||
netapp: netapp.clone(),
|
|
||||||
param,
|
|
||||||
bootstrap_peers,
|
|
||||||
view: RwLock::new(view),
|
|
||||||
current_attempts: RwLock::new(HashSet::new()),
|
|
||||||
backlog: RwLock::new(backlog),
|
|
||||||
});
|
|
||||||
|
|
||||||
let basalt2 = basalt.clone();
|
|
||||||
netapp.on_connected(
|
|
||||||
move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
|
|
||||||
basalt2.on_connected(pk, addr, is_incoming);
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
let basalt2 = basalt.clone();
|
|
||||||
netapp.on_disconnected(move |pk: ed25519::PublicKey, is_incoming: bool| {
|
|
||||||
basalt2.on_disconnected(pk, is_incoming);
|
|
||||||
});
|
|
||||||
|
|
||||||
let basalt2 = basalt.clone();
|
|
||||||
netapp.add_msg_handler::<PullMessage, _, _>(
|
|
||||||
move |_from: ed25519::PublicKey, _pullmsg: PullMessage| {
|
|
||||||
let push_msg = basalt2.make_push_message();
|
|
||||||
async move { push_msg }
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
let basalt2 = basalt.clone();
|
|
||||||
netapp.add_msg_handler::<PushMessage, _, _>(
|
|
||||||
move |_from: ed25519::PublicKey, push_msg: PushMessage| {
|
|
||||||
basalt2.handle_peer_list(&push_msg.peers[..]);
|
|
||||||
async move { () }
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
basalt
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn sample(&self, count: usize) -> Vec<ed25519::PublicKey> {
|
|
||||||
self.view
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.sample(count)
|
|
||||||
.iter()
|
|
||||||
.map(|p| {
|
|
||||||
debug!("KYEV S {}", hex::encode(p.id));
|
|
||||||
p.id
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run(self: Arc<Self>) {
|
|
||||||
for peer in self.bootstrap_peers.iter() {
|
|
||||||
tokio::spawn(self.clone().try_connect(*peer));
|
|
||||||
}
|
|
||||||
|
|
||||||
let pushpull_loop = self.clone().run_pushpull_loop();
|
|
||||||
let reset_loop = self.run_reset_loop();
|
|
||||||
tokio::join!(pushpull_loop, reset_loop);
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_pushpull_loop(self: Arc<Self>) {
|
|
||||||
loop {
|
|
||||||
tokio::time::delay_for(self.param.exchange_interval).await;
|
|
||||||
|
|
||||||
let peers = self.view.read().unwrap().sample(2);
|
|
||||||
if peers.len() == 2 {
|
|
||||||
tokio::spawn(self.clone().do_pull(peers[0].id));
|
|
||||||
tokio::spawn(self.clone().do_push(peers[1].id));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn do_pull(self: Arc<Self>, peer: ed25519::PublicKey) {
|
|
||||||
match self
|
|
||||||
.netapp
|
|
||||||
.request(&peer, PullMessage {}, PRIO_NORMAL)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(resp) => {
|
|
||||||
self.handle_peer_list(&resp.peers[..]);
|
|
||||||
trace!("KYEV PEXi {}", hex::encode(peer));
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Error during pull exchange: {}", e);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn do_push(self: Arc<Self>, peer: ed25519::PublicKey) {
|
|
||||||
let push_msg = self.make_push_message();
|
|
||||||
match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await {
|
|
||||||
Ok(_) => {
|
|
||||||
trace!("KYEV PEXo {}", hex::encode(peer));
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Error during push exchange: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_push_message(&self) -> PushMessage {
|
|
||||||
let current_peers = self.view.read().unwrap().current_peers_vec();
|
|
||||||
PushMessage {
|
|
||||||
peers: current_peers,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_reset_loop(self: Arc<Self>) {
|
|
||||||
loop {
|
|
||||||
tokio::time::delay_for(self.param.reset_interval).await;
|
|
||||||
|
|
||||||
{
|
|
||||||
debug!("KYEV R {}", self.param.reset_count);
|
|
||||||
|
|
||||||
let mut view = self.view.write().unwrap();
|
|
||||||
let prev_peers = view.current_peers();
|
|
||||||
let prev_peers_vec = prev_peers.iter().cloned().collect::<Vec<_>>();
|
|
||||||
|
|
||||||
view.reset_some_slots(self.param.reset_count);
|
|
||||||
view.update_all_slots(&prev_peers_vec[..]);
|
|
||||||
|
|
||||||
let new_peers = view.current_peers();
|
|
||||||
drop(view);
|
|
||||||
|
|
||||||
self.close_all_diff(&prev_peers, &new_peers);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut to_retry_maybe = self.bootstrap_peers.clone();
|
|
||||||
for (peer, _) in self.backlog.read().unwrap().iter() {
|
|
||||||
if !self.bootstrap_peers.contains(peer) {
|
|
||||||
to_retry_maybe.push(*peer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.handle_peer_list(&to_retry_maybe[..]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_peer_list(self: &Arc<Self>, peers: &[Peer]) {
|
|
||||||
let to_connect = self.view.read().unwrap().should_try_list(peers);
|
|
||||||
|
|
||||||
for peer in to_connect.iter() {
|
|
||||||
tokio::spawn(self.clone().try_connect(*peer));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn try_connect(self: Arc<Self>, peer: Peer) {
|
|
||||||
{
|
|
||||||
let view = self.view.read().unwrap();
|
|
||||||
let mut attempts = self.current_attempts.write().unwrap();
|
|
||||||
|
|
||||||
if view.slots.iter().any(|x| x.peer == Some(peer)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if attempts.contains(&peer) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
attempts.insert(peer);
|
|
||||||
}
|
|
||||||
let res = self.netapp.clone().try_connect(peer.addr, peer.id).await;
|
|
||||||
trace!("Connection attempt to {}: {:?}", peer.addr, res);
|
|
||||||
|
|
||||||
self.current_attempts.write().unwrap().remove(&peer);
|
|
||||||
|
|
||||||
if res.is_err() {
|
|
||||||
self.backlog.write().unwrap().pop(&peer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_connected(self: &Arc<Self>, pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool) {
|
|
||||||
if is_incoming {
|
|
||||||
self.handle_peer_list(&[Peer { id: pk, addr }][..]);
|
|
||||||
} else {
|
|
||||||
info!("KYEV C {} {}", hex::encode(pk), addr);
|
|
||||||
let peer = Peer { id: pk, addr };
|
|
||||||
|
|
||||||
let mut backlog = self.backlog.write().unwrap();
|
|
||||||
if backlog.get(&peer).is_none() {
|
|
||||||
backlog.put(peer, ());
|
|
||||||
}
|
|
||||||
drop(backlog);
|
|
||||||
|
|
||||||
let mut view = self.view.write().unwrap();
|
|
||||||
let prev_peers = view.current_peers();
|
|
||||||
|
|
||||||
view.update_all_slots(&[peer][..]);
|
|
||||||
|
|
||||||
let new_peers = view.current_peers();
|
|
||||||
drop(view);
|
|
||||||
|
|
||||||
self.close_all_diff(&prev_peers, &new_peers);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_disconnected(&self, pk: ed25519::PublicKey, is_incoming: bool) {
|
|
||||||
if !is_incoming {
|
|
||||||
info!("KYEV D {}", hex::encode(pk));
|
|
||||||
self.view.write().unwrap().disconnected(pk);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn close_all_diff(&self, prev_peers: &HashSet<Peer>, new_peers: &HashSet<Peer>) {
|
|
||||||
for peer in prev_peers.iter() {
|
|
||||||
if !new_peers.contains(peer) {
|
|
||||||
self.netapp.disconnect(&peer.id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn rand_seed() -> Seed {
|
|
||||||
let mut seed = [0u8; 32];
|
|
||||||
sodiumoxide::randombytes::randombytes_into(&mut seed[..]);
|
|
||||||
seed
|
|
||||||
}
|
|
|
@ -1,2 +1 @@
|
||||||
pub mod basalt;
|
|
||||||
pub mod fullmesh;
|
pub mod fullmesh;
|
||||||
|
|
Loading…
Reference in a new issue