diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index 9a15e58..0000000 --- a/.dockerignore +++ /dev/null @@ -1,2 +0,0 @@ -target -!target/release/examples/basalt diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 0ece89b..0000000 --- a/Dockerfile +++ /dev/null @@ -1,8 +0,0 @@ -FROM archlinux:latest - -COPY target/release/examples/basalt /root/basalt - -ENV RUST_BACKTRACE=1 -ENV RUST_LOG=netapp=info,netapp::peering=debug,basalt=info - -CMD /root/basalt diff --git a/Makefile b/Makefile index 53921a9..569f4b4 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,5 @@ all: cargo build cargo build --example fullmesh - cargo build --example basalt #RUST_LOG=netapp=debug cargo run --example fullmesh -- -n 3242ce79e05e8b6a0e43441fbd140a906e13f335f298ae3a52f29784abbab500 -p 6c304114a0e1018bbe60502a34d33f4f439f370856c3333dda2726da01eb93a4894b7ef7249a71f11d342b69702f1beb7c93ec95fbcf122ad1eca583bb0629e7 -docker_basalt: - cargo build --release --example basalt - docker build -t lxpz/basalt_netapp_example:$(TAG) . - docker push lxpz/basalt_netapp_example:$(TAG) diff --git a/examples/basalt.rs b/examples/basalt.rs deleted file mode 100644 index e1ac2e4..0000000 --- a/examples/basalt.rs +++ /dev/null @@ -1,164 +0,0 @@ -use std::io::Write; -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::Duration; - -use log::{debug, info, warn}; - -use serde::{Deserialize, Serialize}; -use structopt::StructOpt; - -use sodiumoxide::crypto::auth; -use sodiumoxide::crypto::sign::ed25519; - -use netapp::message::*; -use netapp::peering::basalt::*; -use netapp::proto::*; -use netapp::NetApp; - -#[derive(StructOpt, Debug)] -#[structopt(name = "netapp")] -pub struct Opt { - #[structopt(long = "network-key", short = "n")] - network_key: Option, - - #[structopt(long = "private-key", short = "p")] - private_key: Option, - - #[structopt(long = "bootstrap-peer", short = "b")] - bootstrap_peers: Vec, - - #[structopt(long = "listen-addr", short = "l", default_value = "127.0.0.1:1980")] - listen_addr: String, - - #[structopt(long = "public-addr", short = "a")] - public_addr: Option, - - #[structopt(long = "view-size", short = "v", default_value = "100")] - view_size: usize, - - #[structopt(long = "cache-size", short = "c", default_value = "1000")] - cache_size: usize, - - #[structopt(long = "exchange-interval-secs", short = "x", default_value = "1")] - exchange_interval: u64, - - #[structopt(long = "reset-interval-secs", short = "r", default_value = "10")] - reset_interval: u64, - - #[structopt(long = "reset-count", short = "k", default_value = "20")] - reset_count: usize, -} - -#[tokio::main] -async fn main() { - env_logger::Builder::new() - .parse_env("RUST_LOG") - .format(|buf, record| { - writeln!( - buf, - "{} {} {} {}", - chrono::Local::now().format("%s%.6f"), - record.module_path().unwrap_or("_"), - record.level(), - record.args() - ) - }) - .init(); - - let opt = Opt::from_args(); - - let netid = match &opt.network_key { - Some(k) => auth::Key::from_slice(&hex::decode(k).unwrap()).unwrap(), - None => auth::gen_key(), - }; - info!("KYEV NK {}", hex::encode(&netid)); - - let privkey = match &opt.private_key { - Some(k) => ed25519::SecretKey::from_slice(&hex::decode(k).unwrap()).unwrap(), - None => { - let (_pk, sk) = ed25519::gen_keypair(); - sk - } - }; - - info!("KYEV SK {}", hex::encode(&privkey)); - info!("KYEV PK {}", hex::encode(&privkey.public_key())); - - let listen_addr = opt.listen_addr.parse().unwrap(); - let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); - let netapp = NetApp::new(listen_addr, public_addr, netid, privkey); - - let mut bootstrap_peers = vec![]; - for peer in opt.bootstrap_peers.iter() { - if let Some(delim) = peer.find('@') { - let (key, ip) = peer.split_at(delim); - let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap(); - let ip = ip[1..].parse::().unwrap(); - bootstrap_peers.push((pubkey, ip)); - } - } - - let basalt_params = BasaltParams { - view_size: opt.view_size, - cache_size: opt.cache_size, - exchange_interval: Duration::from_secs(opt.exchange_interval), - reset_interval: Duration::from_secs(opt.reset_interval), - reset_count: opt.reset_count, - }; - let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params); - - netapp.add_msg_handler::( - |_from: ed25519::PublicKey, msg: ExampleMessage| { - debug!("Got example message: {:?}, sending example response", msg); - async { - ExampleResponse { - example_field: false, - } - } - }, - ); - - tokio::join!( - sampling_loop(netapp.clone(), peering.clone()), - netapp.listen(), - peering.run(), - ); -} - -async fn sampling_loop(netapp: Arc, basalt: Arc) { - loop { - tokio::time::delay_for(Duration::from_secs(10)).await; - - let peers = basalt.sample(10); - for p in peers { - debug!("kyev S {}", hex::encode(p)); - - let netapp2 = netapp.clone(); - tokio::spawn(async move { - match netapp2 - .request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL) - .await - { - Ok(resp) => debug!("Got example response: {:?}", resp), - Err(e) => warn!("Error with example request: {}", e), - } - }); - } - } -} - -#[derive(Serialize, Deserialize, Debug)] -struct ExampleMessage { - example_field: usize, -} - -#[derive(Serialize, Deserialize, Debug)] -struct ExampleResponse { - example_field: bool, -} - -impl Message for ExampleMessage { - const KIND: MessageKind = 0x99000001; - type Response = ExampleResponse; -} diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs deleted file mode 100644 index 1898dba..0000000 --- a/src/peering/basalt.rs +++ /dev/null @@ -1,476 +0,0 @@ -use std::collections::HashSet; -use std::net::SocketAddr; -use std::sync::{Arc, RwLock}; -use std::time::Duration; - -use log::{debug, info, trace, warn}; -use lru::LruCache; -use rand::{thread_rng, Rng}; -use serde::{Deserialize, Serialize}; - -use sodiumoxide::crypto::hash; -use sodiumoxide::crypto::sign::ed25519; - -use crate::message::*; -use crate::netapp::*; -use crate::proto::*; - -// -- Protocol messages -- - -#[derive(Serialize, Deserialize)] -struct PullMessage {} - -impl Message for PullMessage { - const KIND: MessageKind = 0x42001100; - type Response = PushMessage; -} - -#[derive(Serialize, Deserialize)] -struct PushMessage { - peers: Vec, -} - -impl Message for PushMessage { - const KIND: MessageKind = 0x42001101; - type Response = (); -} - -// -- Algorithm data structures -- - -type Seed = [u8; 32]; - -#[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] -struct Peer { - id: ed25519::PublicKey, - addr: SocketAddr, -} - -type Cost = [u8; 40]; -const MAX_COST: Cost = [0xffu8; 40]; - -impl Peer { - fn cost(&self, seed: &Seed) -> Cost { - let mut hasher = hash::State::new(); - hasher.update(&seed[..]); - - let mut cost = [0u8; 40]; - match self.addr { - SocketAddr::V4(v4addr) => { - let v4ip = v4addr.ip().octets(); - - for i in 0..4 { - let mut h = hasher.clone(); - h.update(&v4ip[..i + 1]); - cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); - } - } - SocketAddr::V6(v6addr) => { - let v6ip = v6addr.ip().octets(); - - for i in 0..4 { - let mut h = hasher.clone(); - h.update(&v6ip[..i + 2]); - cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); - } - } - } - - { - let mut h5 = hasher.clone(); - h5.update(&format!("{} {}", self.addr, hex::encode(self.id)).into_bytes()[..]); - cost[32..40].copy_from_slice(&h5.finalize()[..8]); - } - - cost - } -} - -struct BasaltSlot { - seed: Seed, - peer: Option, -} - -impl BasaltSlot { - fn cost(&self) -> Cost { - self.peer.map(|p| p.cost(&self.seed)).unwrap_or(MAX_COST) - } -} - -struct BasaltView { - i_reset: usize, - slots: Vec, -} - -impl BasaltView { - fn new(size: usize) -> Self { - let slots = (0..size) - .map(|_| BasaltSlot { - seed: rand_seed(), - peer: None, - }) - .collect::>(); - Self { i_reset: 0, slots } - } - - fn current_peers(&self) -> HashSet { - self.slots - .iter() - .filter(|s| s.peer.is_some()) - .map(|s| s.peer.unwrap().clone()) - .collect::>() - } - fn current_peers_vec(&self) -> Vec { - self.current_peers().drain().collect::>() - } - - fn sample(&self, count: usize) -> Vec { - let possibles = self - .slots - .iter() - .enumerate() - .filter(|(_i, s)| s.peer.is_some()) - .map(|(i, _s)| i) - .collect::>(); - if possibles.len() == 0 { - vec![] - } else { - let mut ret = vec![]; - let mut rng = thread_rng(); - for _i in 0..count { - let idx = rng.gen_range(0, possibles.len()); - ret.push(self.slots[possibles[idx]].peer.unwrap()); - } - ret - } - } - - fn update_slot(&mut self, i: usize, peers: &[Peer]) { - let mut slot_cost = self.slots[i].cost(); - - for peer in peers.iter() { - let peer_cost = peer.cost(&self.slots[i].seed); - if self.slots[i].peer.is_none() || peer_cost < slot_cost { - trace!("Best match for slot {}: {}@{} (cost {})", i, hex::encode(peer.id), peer.addr, hex::encode(peer_cost)); - self.slots[i].peer = Some(*peer); - slot_cost = peer_cost; - } - } - } - fn update_all_slots(&mut self, peers: &[Peer]) { - for i in 0..self.slots.len() { - self.update_slot(i, peers); - } - } - - fn disconnected(&mut self, id: ed25519::PublicKey) { - let mut cleared_slots = vec![]; - for i in 0..self.slots.len() { - if let Some(p) = self.slots[i].peer { - if p.id == id { - self.slots[i].peer = None; - cleared_slots.push(i); - } - } - } - - let remaining_peers = self.current_peers_vec(); - - for i in cleared_slots { - self.update_slot(i, &remaining_peers[..]); - } - } - - fn should_try_list(&self, peers: &[Peer]) -> Vec { - // Select peers that have lower cost than any of our slots - let mut ret = HashSet::new(); - - for i in 0..self.slots.len() { - if self.slots[i].peer.is_none() { - return peers.to_vec(); - } - let mut min_cost = self.slots[i].cost(); - let mut min_peer = None; - for peer in peers.iter() { - if ret.contains(peer) { - continue; - } - let peer_cost = peer.cost(&self.slots[i].seed); - if peer_cost < min_cost { - min_cost = peer_cost; - min_peer = Some(*peer); - } - } - if let Some(p) = min_peer { - ret.insert(p); - if ret.len() == peers.len() { - break; - } - } - } - - ret.drain().collect::>() - } - - fn reset_some_slots(&mut self, count: usize) { - for _i in 0..count { - trace!("Reset slot {}", self.i_reset); - self.slots[self.i_reset].seed = rand_seed(); - self.i_reset = (self.i_reset + 1) % self.slots.len(); - } - } -} - -pub struct BasaltParams { - pub view_size: usize, - pub cache_size: usize, - pub exchange_interval: Duration, - pub reset_interval: Duration, - pub reset_count: usize, -} - -pub struct Basalt { - netapp: Arc, - - param: BasaltParams, - bootstrap_peers: Vec, - - view: RwLock, - current_attempts: RwLock>, - backlog: RwLock>, -} - -impl Basalt { - pub fn new( - netapp: Arc, - bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>, - param: BasaltParams, - ) -> Arc { - let bootstrap_peers = bootstrap_list - .iter() - .map(|(id, addr)| Peer { - id: *id, - addr: *addr, - }) - .collect::>(); - - let view = BasaltView::new(param.view_size); - let backlog = LruCache::new(param.cache_size); - - let basalt = Arc::new(Self { - netapp: netapp.clone(), - param, - bootstrap_peers, - view: RwLock::new(view), - current_attempts: RwLock::new(HashSet::new()), - backlog: RwLock::new(backlog), - }); - - let basalt2 = basalt.clone(); - netapp.on_connected( - move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { - basalt2.on_connected(pk, addr, is_incoming); - }, - ); - - let basalt2 = basalt.clone(); - netapp.on_disconnected(move |pk: ed25519::PublicKey, is_incoming: bool| { - basalt2.on_disconnected(pk, is_incoming); - }); - - let basalt2 = basalt.clone(); - netapp.add_msg_handler::( - move |_from: ed25519::PublicKey, _pullmsg: PullMessage| { - let push_msg = basalt2.make_push_message(); - async move { push_msg } - }, - ); - - let basalt2 = basalt.clone(); - netapp.add_msg_handler::( - move |_from: ed25519::PublicKey, push_msg: PushMessage| { - basalt2.handle_peer_list(&push_msg.peers[..]); - async move { () } - }, - ); - - basalt - } - - pub fn sample(&self, count: usize) -> Vec { - self.view - .read() - .unwrap() - .sample(count) - .iter() - .map(|p| { - debug!("KYEV S {}", hex::encode(p.id)); - p.id - }) - .collect::>() - } - - pub async fn run(self: Arc) { - for peer in self.bootstrap_peers.iter() { - tokio::spawn(self.clone().try_connect(*peer)); - } - - let pushpull_loop = self.clone().run_pushpull_loop(); - let reset_loop = self.run_reset_loop(); - tokio::join!(pushpull_loop, reset_loop); - } - - async fn run_pushpull_loop(self: Arc) { - loop { - tokio::time::delay_for(self.param.exchange_interval).await; - - let peers = self.view.read().unwrap().sample(2); - if peers.len() == 2 { - tokio::spawn(self.clone().do_pull(peers[0].id)); - tokio::spawn(self.clone().do_push(peers[1].id)); - } - } - } - - async fn do_pull(self: Arc, peer: ed25519::PublicKey) { - match self - .netapp - .request(&peer, PullMessage {}, PRIO_NORMAL) - .await - { - Ok(resp) => { - self.handle_peer_list(&resp.peers[..]); - trace!("KYEV PEXi {}", hex::encode(peer)); - } - Err(e) => { - warn!("Error during pull exchange: {}", e); - } - }; - } - - async fn do_push(self: Arc, peer: ed25519::PublicKey) { - let push_msg = self.make_push_message(); - match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await { - Ok(_) => { - trace!("KYEV PEXo {}", hex::encode(peer)); - } - Err(e) => { - warn!("Error during push exchange: {}", e); - } - } - } - - fn make_push_message(&self) -> PushMessage { - let current_peers = self.view.read().unwrap().current_peers_vec(); - PushMessage { - peers: current_peers, - } - } - - async fn run_reset_loop(self: Arc) { - loop { - tokio::time::delay_for(self.param.reset_interval).await; - - { - debug!("KYEV R {}", self.param.reset_count); - - let mut view = self.view.write().unwrap(); - let prev_peers = view.current_peers(); - let prev_peers_vec = prev_peers.iter().cloned().collect::>(); - - view.reset_some_slots(self.param.reset_count); - view.update_all_slots(&prev_peers_vec[..]); - - let new_peers = view.current_peers(); - drop(view); - - self.close_all_diff(&prev_peers, &new_peers); - } - - let mut to_retry_maybe = self.bootstrap_peers.clone(); - for (peer, _) in self.backlog.read().unwrap().iter() { - if !self.bootstrap_peers.contains(peer) { - to_retry_maybe.push(*peer); - } - } - self.handle_peer_list(&to_retry_maybe[..]); - } - } - - fn handle_peer_list(self: &Arc, peers: &[Peer]) { - let to_connect = self.view.read().unwrap().should_try_list(peers); - - for peer in to_connect.iter() { - tokio::spawn(self.clone().try_connect(*peer)); - } - } - - async fn try_connect(self: Arc, peer: Peer) { - { - let view = self.view.read().unwrap(); - let mut attempts = self.current_attempts.write().unwrap(); - - if view.slots.iter().any(|x| x.peer == Some(peer)) { - return; - } - if attempts.contains(&peer) { - return; - } - - attempts.insert(peer); - } - let res = self.netapp.clone().try_connect(peer.addr, peer.id).await; - trace!("Connection attempt to {}: {:?}", peer.addr, res); - - self.current_attempts.write().unwrap().remove(&peer); - - if res.is_err() { - self.backlog.write().unwrap().pop(&peer); - } - } - - fn on_connected(self: &Arc, pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool) { - if is_incoming { - self.handle_peer_list(&[Peer { id: pk, addr }][..]); - } else { - info!("KYEV C {} {}", hex::encode(pk), addr); - let peer = Peer { id: pk, addr }; - - let mut backlog = self.backlog.write().unwrap(); - if backlog.get(&peer).is_none() { - backlog.put(peer, ()); - } - drop(backlog); - - let mut view = self.view.write().unwrap(); - let prev_peers = view.current_peers(); - - view.update_all_slots(&[peer][..]); - - let new_peers = view.current_peers(); - drop(view); - - self.close_all_diff(&prev_peers, &new_peers); - } - } - - fn on_disconnected(&self, pk: ed25519::PublicKey, is_incoming: bool) { - if !is_incoming { - info!("KYEV D {}", hex::encode(pk)); - self.view.write().unwrap().disconnected(pk); - } - } - - fn close_all_diff(&self, prev_peers: &HashSet, new_peers: &HashSet) { - for peer in prev_peers.iter() { - if !new_peers.contains(peer) { - self.netapp.disconnect(&peer.id); - } - } - } -} - -fn rand_seed() -> Seed { - let mut seed = [0u8; 32]; - sodiumoxide::randombytes::randombytes_into(&mut seed[..]); - seed -} diff --git a/src/peering/mod.rs b/src/peering/mod.rs index beb2e18..044b1df 100644 --- a/src/peering/mod.rs +++ b/src/peering/mod.rs @@ -1,2 +1 @@ -pub mod basalt; pub mod fullmesh;