diff --git a/.drone.yml b/.drone.yml index cf4cc3e..aa99d4c 100644 --- a/.drone.yml +++ b/.drone.yml @@ -27,6 +27,8 @@ steps: - apt-get update - apt-get install --yes libsodium-dev - cargo build + - cargo build --example fullmesh + - cargo build --example basalt - name: rebuild-cache image: meltwater/drone-cache:dev diff --git a/README.md b/README.md index d39de5f..9e56c9f 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,6 @@ Netapp is a Rust library that takes care of a few common tasks in distributed so - peer discovery - query/response message passing model for communications - multiplexing transfers over a connection -- overlay networks: full mesh, and soon other methods +- overlay networks: full mesh, and byzantine-tolerant random peer sampling using [Bᴀsᴀʟᴛ](https://arxiv.org/abs/2102.04063). See examples folder to learn how to use netapp. diff --git a/examples/basalt.rs b/examples/basalt.rs new file mode 100644 index 0000000..4ea4f71 --- /dev/null +++ b/examples/basalt.rs @@ -0,0 +1,164 @@ +use std::io::Write; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use log::{debug, info, warn}; + +use serde::{Deserialize, Serialize}; +use structopt::StructOpt; + +use sodiumoxide::crypto::auth; +use sodiumoxide::crypto::sign::ed25519; + +use netapp::message::*; +use netapp::peering::basalt::*; +use netapp::proto::*; +use netapp::NetApp; + +#[derive(StructOpt, Debug)] +#[structopt(name = "netapp")] +pub struct Opt { + #[structopt(long = "network-key", short = "n")] + network_key: Option, + + #[structopt(long = "private-key", short = "p")] + private_key: Option, + + #[structopt(long = "bootstrap-peer", short = "b")] + bootstrap_peers: Vec, + + #[structopt(long = "listen-addr", short = "l", default_value = "127.0.0.1:1980")] + listen_addr: String, + + #[structopt(long = "public-addr", short = "a")] + public_addr: Option, + + #[structopt(long = "view-size", short = "v", default_value = "100")] + view_size: usize, + + #[structopt(long = "cache-size", short = "c", default_value = "1000")] + cache_size: usize, + + #[structopt(long = "exchange-interval-secs", short = "x", default_value = "1")] + exchange_interval: u64, + + #[structopt(long = "reset-interval-secs", short = "r", default_value = "10")] + reset_interval: u64, + + #[structopt(long = "reset-count", short = "k", default_value = "20")] + reset_count: usize, +} + +#[tokio::main] +async fn main() { + env_logger::Builder::new() + .parse_env("RUST_LOG") + .format(|buf, record| { + writeln!( + buf, + "{} {} {} {}", + chrono::Local::now().format("%s%.6f"), + record.module_path().unwrap_or("_"), + record.level(), + record.args() + ) + }) + .init(); + + let opt = Opt::from_args(); + + let netid = match &opt.network_key { + Some(k) => auth::Key::from_slice(&hex::decode(k).unwrap()).unwrap(), + None => auth::gen_key(), + }; + info!("KYEV NK {}", hex::encode(&netid)); + + let privkey = match &opt.private_key { + Some(k) => ed25519::SecretKey::from_slice(&hex::decode(k).unwrap()).unwrap(), + None => { + let (_pk, sk) = ed25519::gen_keypair(); + sk + } + }; + + info!("KYEV SK {}", hex::encode(&privkey)); + info!("KYEV PK {}", hex::encode(&privkey.public_key())); + + let netapp = NetApp::new(netid, privkey); + + let mut bootstrap_peers = vec![]; + for peer in opt.bootstrap_peers.iter() { + if let Some(delim) = peer.find('@') { + let (key, ip) = peer.split_at(delim); + let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap(); + let ip = ip[1..].parse::().unwrap(); + bootstrap_peers.push((pubkey, ip)); + } + } + + let basalt_params = BasaltParams { + view_size: opt.view_size, + cache_size: opt.cache_size, + exchange_interval: Duration::from_secs(opt.exchange_interval), + reset_interval: Duration::from_secs(opt.reset_interval), + reset_count: opt.reset_count, + }; + let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params); + + netapp.add_msg_handler::( + |_from: ed25519::PublicKey, msg: ExampleMessage| { + debug!("Got example message: {:?}, sending example response", msg); + async { + ExampleResponse { + example_field: false, + } + } + }, + ); + + let listen_addr = opt.listen_addr.parse().unwrap(); + let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); + tokio::join!( + sampling_loop(netapp.clone(), peering.clone()), + netapp.listen(listen_addr, public_addr), + peering.run(), + ); +} + +async fn sampling_loop(netapp: Arc, basalt: Arc) { + loop { + tokio::time::delay_for(Duration::from_secs(10)).await; + + let peers = basalt.sample(10); + for p in peers { + debug!("kyev S {}", hex::encode(p)); + + let netapp2 = netapp.clone(); + tokio::spawn(async move { + match netapp2 + .request(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL) + .await + { + Ok(resp) => debug!("Got example response: {:?}", resp), + Err(e) => warn!("Error with example request: {}", e), + } + }); + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct ExampleMessage { + example_field: usize, +} + +#[derive(Serialize, Deserialize, Debug)] +struct ExampleResponse { + example_field: bool, +} + +impl Message for ExampleMessage { + const KIND: MessageKind = 0x99000001; + type Response = ExampleResponse; +} diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs new file mode 100644 index 0000000..3c1fc9e --- /dev/null +++ b/src/peering/basalt.rs @@ -0,0 +1,476 @@ +use std::collections::HashSet; +use std::net::SocketAddr; +use std::sync::{Arc, RwLock}; +use std::time::Duration; + +use log::{debug, info, trace, warn}; +use lru::LruCache; +use rand::{thread_rng, Rng}; +use serde::{Deserialize, Serialize}; + +use sodiumoxide::crypto::hash; + +use crate::message::*; +use crate::netapp::*; +use crate::proto::*; +use crate::NodeID; + +// -- Protocol messages -- + +#[derive(Serialize, Deserialize)] +struct PullMessage {} + +impl Message for PullMessage { + const KIND: MessageKind = 0x42001100; + type Response = PushMessage; +} + +#[derive(Serialize, Deserialize)] +struct PushMessage { + peers: Vec, +} + +impl Message for PushMessage { + const KIND: MessageKind = 0x42001101; + type Response = (); +} + +// -- Algorithm data structures -- + +type Seed = [u8; 32]; + +#[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] +struct Peer { + id: NodeID, + addr: SocketAddr, +} + +type Cost = [u8; 40]; +const MAX_COST: Cost = [0xffu8; 40]; + +impl Peer { + fn cost(&self, seed: &Seed) -> Cost { + let mut hasher = hash::State::new(); + hasher.update(&seed[..]); + + let mut cost = [0u8; 40]; + match self.addr { + SocketAddr::V4(v4addr) => { + let v4ip = v4addr.ip().octets(); + + for i in 0..4 { + let mut h = hasher.clone(); + h.update(&v4ip[..i + 1]); + cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); + } + } + SocketAddr::V6(v6addr) => { + let v6ip = v6addr.ip().octets(); + + for i in 0..4 { + let mut h = hasher.clone(); + h.update(&v6ip[..i + 2]); + cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); + } + } + } + + { + let mut h5 = hasher.clone(); + h5.update(&format!("{} {}", self.addr, hex::encode(self.id)).into_bytes()[..]); + cost[32..40].copy_from_slice(&h5.finalize()[..8]); + } + + cost + } +} + +struct BasaltSlot { + seed: Seed, + peer: Option, +} + +impl BasaltSlot { + fn cost(&self) -> Cost { + self.peer.map(|p| p.cost(&self.seed)).unwrap_or(MAX_COST) + } +} + +struct BasaltView { + i_reset: usize, + slots: Vec, +} + +impl BasaltView { + fn new(size: usize) -> Self { + let slots = (0..size) + .map(|_| BasaltSlot { + seed: rand_seed(), + peer: None, + }) + .collect::>(); + Self { i_reset: 0, slots } + } + + fn current_peers(&self) -> HashSet { + self.slots + .iter() + .filter(|s| s.peer.is_some()) + .map(|s| s.peer.unwrap().clone()) + .collect::>() + } + fn current_peers_vec(&self) -> Vec { + self.current_peers().drain().collect::>() + } + + fn sample(&self, count: usize) -> Vec { + let possibles = self + .slots + .iter() + .enumerate() + .filter(|(_i, s)| s.peer.is_some()) + .map(|(i, _s)| i) + .collect::>(); + if possibles.len() == 0 { + vec![] + } else { + let mut ret = vec![]; + let mut rng = thread_rng(); + for _i in 0..count { + let idx = rng.gen_range(0, possibles.len()); + ret.push(self.slots[possibles[idx]].peer.unwrap()); + } + ret + } + } + + fn update_slot(&mut self, i: usize, peers: &[Peer]) { + let mut slot_cost = self.slots[i].cost(); + + for peer in peers.iter() { + let peer_cost = peer.cost(&self.slots[i].seed); + if self.slots[i].peer.is_none() || peer_cost < slot_cost { + trace!( + "Best match for slot {}: {}@{} (cost {})", + i, + hex::encode(peer.id), + peer.addr, + hex::encode(peer_cost) + ); + self.slots[i].peer = Some(*peer); + slot_cost = peer_cost; + } + } + } + fn update_all_slots(&mut self, peers: &[Peer]) { + for i in 0..self.slots.len() { + self.update_slot(i, peers); + } + } + + fn disconnected(&mut self, id: NodeID) { + let mut cleared_slots = vec![]; + for i in 0..self.slots.len() { + if let Some(p) = self.slots[i].peer { + if p.id == id { + self.slots[i].peer = None; + cleared_slots.push(i); + } + } + } + + let remaining_peers = self.current_peers_vec(); + + for i in cleared_slots { + self.update_slot(i, &remaining_peers[..]); + } + } + + fn should_try_list(&self, peers: &[Peer]) -> Vec { + // Select peers that have lower cost than any of our slots + let mut ret = HashSet::new(); + + for i in 0..self.slots.len() { + if self.slots[i].peer.is_none() { + return peers.to_vec(); + } + let mut min_cost = self.slots[i].cost(); + let mut min_peer = None; + for peer in peers.iter() { + if ret.contains(peer) { + continue; + } + let peer_cost = peer.cost(&self.slots[i].seed); + if peer_cost < min_cost { + min_cost = peer_cost; + min_peer = Some(*peer); + } + } + if let Some(p) = min_peer { + ret.insert(p); + if ret.len() == peers.len() { + break; + } + } + } + + ret.drain().collect::>() + } + + fn reset_some_slots(&mut self, count: usize) { + for _i in 0..count { + trace!("Reset slot {}", self.i_reset); + self.slots[self.i_reset].seed = rand_seed(); + self.i_reset = (self.i_reset + 1) % self.slots.len(); + } + } +} + +pub struct BasaltParams { + pub view_size: usize, + pub cache_size: usize, + pub exchange_interval: Duration, + pub reset_interval: Duration, + pub reset_count: usize, +} + +pub struct Basalt { + netapp: Arc, + + param: BasaltParams, + bootstrap_peers: Vec, + + view: RwLock, + current_attempts: RwLock>, + backlog: RwLock>, +} + +impl Basalt { + pub fn new( + netapp: Arc, + bootstrap_list: Vec<(NodeID, SocketAddr)>, + param: BasaltParams, + ) -> Arc { + let bootstrap_peers = bootstrap_list + .iter() + .map(|(id, addr)| Peer { + id: *id, + addr: *addr, + }) + .collect::>(); + + let view = BasaltView::new(param.view_size); + let backlog = LruCache::new(param.cache_size); + + let basalt = Arc::new(Self { + netapp: netapp.clone(), + param, + bootstrap_peers, + view: RwLock::new(view), + current_attempts: RwLock::new(HashSet::new()), + backlog: RwLock::new(backlog), + }); + + let basalt2 = basalt.clone(); + netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { + basalt2.on_connected(id, addr, is_incoming); + }); + + let basalt2 = basalt.clone(); + netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { + basalt2.on_disconnected(id, is_incoming); + }); + + let basalt2 = basalt.clone(); + netapp.add_msg_handler::(move |_from: NodeID, _pullmsg: PullMessage| { + let push_msg = basalt2.make_push_message(); + async move { push_msg } + }); + + let basalt2 = basalt.clone(); + netapp.add_msg_handler::(move |_from: NodeID, push_msg: PushMessage| { + basalt2.handle_peer_list(&push_msg.peers[..]); + async move { () } + }); + + basalt + } + + pub fn sample(&self, count: usize) -> Vec { + self.view + .read() + .unwrap() + .sample(count) + .iter() + .map(|p| { + debug!("KYEV S {}", hex::encode(p.id)); + p.id + }) + .collect::>() + } + + pub async fn run(self: Arc) { + for peer in self.bootstrap_peers.iter() { + tokio::spawn(self.clone().try_connect(*peer)); + } + + let pushpull_loop = self.clone().run_pushpull_loop(); + let reset_loop = self.run_reset_loop(); + tokio::join!(pushpull_loop, reset_loop); + } + + async fn run_pushpull_loop(self: Arc) { + loop { + tokio::time::delay_for(self.param.exchange_interval).await; + + let peers = self.view.read().unwrap().sample(2); + if peers.len() == 2 { + tokio::spawn(self.clone().do_pull(peers[0].id)); + tokio::spawn(self.clone().do_push(peers[1].id)); + } + } + } + + async fn do_pull(self: Arc, peer: NodeID) { + match self + .netapp + .request(&peer, PullMessage {}, PRIO_NORMAL) + .await + { + Ok(resp) => { + self.handle_peer_list(&resp.peers[..]); + trace!("KYEV PEXi {}", hex::encode(peer)); + } + Err(e) => { + warn!("Error during pull exchange: {}", e); + } + }; + } + + async fn do_push(self: Arc, peer: NodeID) { + let push_msg = self.make_push_message(); + match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await { + Ok(_) => { + trace!("KYEV PEXo {}", hex::encode(peer)); + } + Err(e) => { + warn!("Error during push exchange: {}", e); + } + } + } + + fn make_push_message(&self) -> PushMessage { + let current_peers = self.view.read().unwrap().current_peers_vec(); + PushMessage { + peers: current_peers, + } + } + + async fn run_reset_loop(self: Arc) { + loop { + tokio::time::delay_for(self.param.reset_interval).await; + + { + debug!("KYEV R {}", self.param.reset_count); + + let mut view = self.view.write().unwrap(); + let prev_peers = view.current_peers(); + let prev_peers_vec = prev_peers.iter().cloned().collect::>(); + + view.reset_some_slots(self.param.reset_count); + view.update_all_slots(&prev_peers_vec[..]); + + let new_peers = view.current_peers(); + drop(view); + + self.close_all_diff(&prev_peers, &new_peers); + } + + let mut to_retry_maybe = self.bootstrap_peers.clone(); + for (peer, _) in self.backlog.read().unwrap().iter() { + if !self.bootstrap_peers.contains(peer) { + to_retry_maybe.push(*peer); + } + } + self.handle_peer_list(&to_retry_maybe[..]); + } + } + + fn handle_peer_list(self: &Arc, peers: &[Peer]) { + let to_connect = self.view.read().unwrap().should_try_list(peers); + + for peer in to_connect.iter() { + tokio::spawn(self.clone().try_connect(*peer)); + } + } + + async fn try_connect(self: Arc, peer: Peer) { + { + let view = self.view.read().unwrap(); + let mut attempts = self.current_attempts.write().unwrap(); + + if view.slots.iter().any(|x| x.peer == Some(peer)) { + return; + } + if attempts.contains(&peer) { + return; + } + + attempts.insert(peer); + } + let res = self.netapp.clone().try_connect(peer.addr, peer.id).await; + trace!("Connection attempt to {}: {:?}", peer.addr, res); + + self.current_attempts.write().unwrap().remove(&peer); + + if res.is_err() { + self.backlog.write().unwrap().pop(&peer); + } + } + + fn on_connected(self: &Arc, id: NodeID, addr: SocketAddr, is_incoming: bool) { + if is_incoming { + self.handle_peer_list(&[Peer { id, addr }][..]); + } else { + info!("KYEV C {} {}", hex::encode(id), addr); + let peer = Peer { id, addr }; + + let mut backlog = self.backlog.write().unwrap(); + if backlog.get(&peer).is_none() { + backlog.put(peer, ()); + } + drop(backlog); + + let mut view = self.view.write().unwrap(); + let prev_peers = view.current_peers(); + + view.update_all_slots(&[peer][..]); + + let new_peers = view.current_peers(); + drop(view); + + self.close_all_diff(&prev_peers, &new_peers); + } + } + + fn on_disconnected(&self, id: NodeID, is_incoming: bool) { + if !is_incoming { + info!("KYEV D {}", hex::encode(id)); + self.view.write().unwrap().disconnected(id); + } + } + + fn close_all_diff(&self, prev_peers: &HashSet, new_peers: &HashSet) { + for peer in prev_peers.iter() { + if !new_peers.contains(peer) { + self.netapp.disconnect(&peer.id); + } + } + } +} + +fn rand_seed() -> Seed { + let mut seed = [0u8; 32]; + sodiumoxide::randombytes::randombytes_into(&mut seed[..]); + seed +} diff --git a/src/peering/mod.rs b/src/peering/mod.rs index 044b1df..15b870a 100644 --- a/src/peering/mod.rs +++ b/src/peering/mod.rs @@ -1 +1,2 @@ pub mod fullmesh; +pub mod basalt;