commit 878414bfb7fc227a85589617bf433ce4adc0ebbe Author: Alex Auvolat Date: Thu Mar 9 10:42:11 2023 +0100 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..acbc162 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,317 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "aho-corasick" +version = "0.7.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +dependencies = [ + "memchr", +] + +[[package]] +name = "anyhow" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800" + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "env_logger" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "humantime" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" +dependencies = [ + "quick-error", +] + +[[package]] +name = "indexmap" +version = "1.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" +dependencies = [ + "autocfg", + "hashbrown", +] + +[[package]] +name = "libc" +version = "0.2.139" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" + +[[package]] +name = "log" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + +[[package]] +name = "pretty_env_logger" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "926d36b9553851b8b0005f1275891b392ee4d2d833852c417ed025477350fb9d" +dependencies = [ + "env_logger", + "log", +] + +[[package]] +name = "proc-macro2" +version = "1.0.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + +[[package]] +name = "quote" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "regex" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" + +[[package]] +name = "serde" +version = "1.0.154" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cdd151213925e7f1ab45a9bbfb129316bd00799784b174b7cc7bcd16961c49e" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.154" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fc80d722935453bcafdc2c9a73cd6fac4dc1938f0346035d84bf99fa9e33217" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_spanned" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0efd8caf556a6cebd3b285caf480045fcc1ac04f6bd786b09a6f11af30c4fcf4" +dependencies = [ + "serde", +] + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "termcolor" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "toml" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7afcae9e3f0fe2c370fd4657108972cbb2fa9db1b9f84849cefd80741b01cb6" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ab8ed2edee10b50132aed5f331333428b011c99402b5a534154ed15746f9622" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.19.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a1eb0622d28f4b9c90adc4ea4b2b46b47663fde9ac5fafcb14a1369d5508825" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + +[[package]] +name = "unicode-ident" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" + +[[package]] +name = "wgautomesh" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "log", + "pretty_env_logger", + "serde", + "toml", + "xxhash-rust", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "winnow" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee7b2c67f962bf5042bfd8b6a916178df33a26eec343ae064cb8e069f638fa6f" +dependencies = [ + "memchr", +] + +[[package]] +name = "xxhash-rust" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "735a71d46c4d68d71d4b24d03fdc2b98e38cea81730595801db779c04fe80d70" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..319862c --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "wgautomesh" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } +anyhow = "1.0" +log = "0.4" +pretty_env_logger = "0.4.0" + +serde = { version = "1.0", features = ["derive"] } +bincode = "1.3" +toml = { version = "0.7", default-features = false, features = ["parse"] } diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ec8cfaf --- /dev/null +++ b/Makefile @@ -0,0 +1,2 @@ +all: + RUST_LOG=wgautomesh=debug cargo run -- config.toml diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..568d364 --- /dev/null +++ b/config.toml @@ -0,0 +1,2 @@ +interface = "route48" +gossip_port = 1134 diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..766fb20 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,382 @@ +use std::collections::HashMap; +use std::net::{IpAddr, SocketAddr, UdpSocket}; +use std::process::Command; +use std::sync::Mutex; +use std::thread; +use std::time::Duration; + +use anyhow::{bail, Result}; +use log::*; +use serde::{Deserialize, Serialize}; + +/// Keep at most this many addresses for each peer +const KEEP_MAX_ADDRESSES: usize = 5; +/// Number of peers to gossip with +const GOSSIP_PEERS: usize = 10; + +/// Interval at which to try new addresses when disconnected (1 minute) +const TRY_INTERVAL: Duration = Duration::from_secs(60); +/// Time before a peer is considered dead (5 minutes) +const TIMEOUT: Duration = Duration::from_secs(300); +/// Interval at which to gossip last_seen info +const GOSSIP_INTERVAL: Duration = Duration::from_secs(300); + +type Pubkey = String; + +#[derive(Deserialize)] +struct Config { + /// The Wireguard interface name + interface: Pubkey, + /// The port to use for gossip inside the Wireguard mesh (must be the same on all nodes) + gossip_port: u16, + /// The list of peers we try to connect to + #[serde(default)] + peers: Vec, +} + +#[derive(Deserialize)] +struct Peer { + /// The peer's Wireguard public key + pubkey: Pubkey, + /// The peer's Wireguard address + address: IpAddr, + /// An optionnal Wireguard endpoint used to initialize a connection to this peer + endpoint: Option, +} + +fn main() -> Result<()> { + pretty_env_logger::init(); + + let args: Vec = std::env::args().collect(); + + let config_path = match args.len() { + 0 | 1 => "/etc/wgautomesh.toml", + 2 => &args[1], + _ => bail!( + "Usage: {} [path_to_config_file]", + args.get(0).map(String::as_str).unwrap_or("wgautomesh") + ), + }; + + let config: Config = { + let config_str = std::fs::read_to_string(config_path)?; + toml::from_str(&config_str)? + }; + + Daemon::new(config)?.run() +} + +// ============ UTIL ================= + +fn time() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() +} + +fn fasthash(data: &[u8]) -> u64 { + use xxhash_rust::xxh3::Xxh3; + + let mut h = Xxh3::new(); + h.update(data); + h.digest() +} + +fn wg_dump(config: &Config) -> Result<(Pubkey, Vec<(Pubkey, Option, u64)>)> { + let output = Command::new("sudo") + .args(["wg", "show", &config.interface, "dump"]) + .output()?; + let mut lines = std::str::from_utf8(&output.stdout)?.split('\n'); + + let ourself = lines.next().unwrap().split('\t').collect::>(); + let our_pubkey = ourself[1].to_string(); + + let peers = lines + .filter_map(|line| { + let fields = line.split('\t').collect::>(); + if fields.len() < 5 { + None + } else { + Some(( + fields[0].to_string(), + fields[2].parse::().ok(), + fields[4].parse::().unwrap(), + )) + } + }) + .collect::>(); + + Ok((our_pubkey, peers)) +} + +// ============ DAEMON CODE ================= + +struct Daemon { + config: Config, + ourself: Pubkey, + socket: UdpSocket, + state: Mutex, +} + +struct PeerInfo { + endpoint: Option, + last_seen: u64, + gossip_ip: IpAddr, + gossip_prio: u64, +} + +#[derive(Serialize, Deserialize, Debug)] +enum Gossip { + Announce { + pubkey: Pubkey, + endpoints: Vec<(SocketAddr, u64)>, + }, + Request, +} + +impl Daemon { + fn new(config: Config) -> Result { + let (ourself, _peers) = wg_dump(&config)?; + let socket = UdpSocket::bind(SocketAddr::new("0.0.0.0".parse()?, config.gossip_port))?; + Ok(Daemon { + config, + ourself, + socket, + state: Mutex::new(State { + peers: HashMap::new(), + gossip: HashMap::new(), + }), + }) + } + + fn run(&self) -> Result<()> { + let request = bincode::serialize(&Gossip::Request)?; + for peer in self.config.peers.iter() { + let addr = SocketAddr::new(peer.address, self.config.gossip_port); + self.socket.send_to(&request, addr)?; + } + + thread::scope(|s| { + s.spawn(|| self.wg_loop()); + s.spawn(|| self.recv_loop()); + }); + unreachable!() + } + + fn wg_loop(&self) -> ! { + let mut i = 0; + loop { + if let Err(e) = self.wg_loop_iter(i) { + error!("Wg loop error: {}", e); + } + i = i + 1; + std::thread::sleep(TRY_INTERVAL); + } + } + + fn wg_loop_iter(&self, i: usize) -> Result<()> { + let (_, wg_peers) = wg_dump(&self.config)?; + let mut state = self.state.lock().unwrap(); + + // 1. Update local peers info of peers + for (pk, endpoint, last_seen) in wg_peers { + match state.peers.get_mut(&pk) { + Some(i) => { + i.endpoint = endpoint; + i.last_seen = last_seen; + } + None => { + let gossip_ip = match self.config.peers.iter().find(|x| x.pubkey == pk) { + Some(x) => x.address, + None => continue, + }; + let gossip_prio = fasthash(format!("{}-{}", self.ourself, pk).as_bytes()); + state.peers.insert( + pk, + PeerInfo { + endpoint, + gossip_prio, + gossip_ip, + last_seen, + }, + ); + } + } + } + + // 2. Send gossip for peers where there is a big update + let announces = state + .peers + .iter() + .filter_map(|(pk, info)| info.endpoint.map(|ip| (pk, ip, info.last_seen))) + .filter(|(pk, ip, last_seen)| { + !state + .gossip + .get(pk.as_str()) + .unwrap_or(&vec![]) + .iter() + .any(|(a, t)| a == ip && *last_seen > t + GOSSIP_INTERVAL.as_secs()) + }) + .map(|(pk, ip, last_seen)| (pk.to_string(), vec![(ip, last_seen)])) + .collect::>(); + + for (pubkey, endpoints) in announces { + state.handle_announce(self, pubkey, endpoints)?; + } + + // 3. Try new address for disconnected peers + let now = time(); + for peer in self.config.peers.iter() { + // Skip peer if it is in connected state + if state + .peers + .get(&peer.pubkey) + .map(|x| now < x.last_seen + TIMEOUT.as_secs()) + .unwrap_or(false) + { + continue; + } + let mut endpoints = state.gossip.get(&peer.pubkey).cloned().unwrap_or_default(); + if endpoints.is_empty() { + if let Some(endpoint) = peer.endpoint { + endpoints.push((endpoint, 0)); + } + } + endpoints.sort(); + if !endpoints.is_empty() { + let endpoint = endpoints[i % endpoints.len()]; + info!("Configure {} with endpoint {}", peer.pubkey, endpoint.0); + Command::new("sudo") + .args([ + "wg", + "set", + &self.config.interface, + "peer", + &peer.pubkey, + "endpoint", + &endpoint.0.to_string(), + "persistent-keepalive", + "20", + ]) + .output()?; + } + } + + Ok(()) + } + + fn recv_loop(&self) -> ! { + loop { + if let Err(e) = self.recv_loop_iter() { + error!("Receive loop error: {}", e); + std::thread::sleep(Duration::from_secs(10)); + } + } + } + + fn recv_loop_iter(&self) -> Result<()> { + let (from, gossip) = self.recv_gossip()?; + let mut state = self.state.lock().unwrap(); + match gossip { + Gossip::Announce { pubkey, endpoints } => { + state.handle_announce(self, pubkey, endpoints)?; + } + Gossip::Request => { + for (pubkey, endpoints) in state.gossip.iter() { + let packet = bincode::serialize(&Gossip::Announce { + pubkey: pubkey.clone(), + endpoints: endpoints.clone(), + })?; + self.socket.send_to(&packet, from)?; + } + } + } + Ok(()) + } + + fn recv_gossip(&self) -> Result<(SocketAddr, Gossip)> { + let mut buf = vec![0u8; 1500]; + let (amt, src) = self.socket.recv_from(&mut buf)?; + if !self.config.peers.iter().any(|x| x.address == src.ip()) { + bail!("Received message from unexpected peer: {}", src); + } + let gossip = bincode::deserialize(&buf[..amt])?; + debug!("RECV {}\t{:?}", src, gossip); + Ok((src, gossip)) + } +} + +struct State { + peers: HashMap, + gossip: HashMap>, +} + +impl State { + fn send_gossip(&self, daemon: &Daemon, gossip: Gossip) -> Result<()> { + let packet = bincode::serialize(&gossip)?; + + let now = time(); + + let mut peer_vec = self + .peers + .iter() + .filter(|(_, info)| now < info.last_seen + TIMEOUT.as_secs() && info.endpoint.is_some()) + .map(|(_, info)| (info.gossip_ip, info.gossip_prio)) + .collect::>(); + peer_vec.sort_by_key(|(_, prio)| *prio); + + for (gossip_ip, _) in peer_vec.into_iter().take(GOSSIP_PEERS) { + let addr = SocketAddr::new(gossip_ip, daemon.config.gossip_port); + debug!("SEND {}\t{:?}", addr, gossip); + daemon.socket.send_to(&packet, addr)?; + } + + Ok(()) + } + + fn handle_announce( + &mut self, + daemon: &Daemon, + pubkey: Pubkey, + mut endpoints: Vec<(SocketAddr, u64)>, + ) -> Result<()> { + let propagate = { + match self.gossip.get_mut(&pubkey) { + Some(existing) => { + let mut has_new = false; + for (new_addr, new_t) in endpoints { + if !existing + .iter() + .any(|(addr, t)| *addr == new_addr && *t >= new_t) + { + existing.retain(|(addr, _)| *addr != new_addr); + existing.push((new_addr, new_t)); + has_new = true; + } + } + if has_new { + existing.sort_by_key(|(_, t)| *t); + existing.truncate(KEEP_MAX_ADDRESSES); + Some(Gossip::Announce { + pubkey, + endpoints: existing.clone(), + }) + } else { + None + } + } + None => { + endpoints.truncate(KEEP_MAX_ADDRESSES); + self.gossip.insert(pubkey.clone(), endpoints.clone()); + Some(Gossip::Announce { pubkey, endpoints }) + } + } + }; + if let Some(propagate) = propagate { + info!("Propagating announce: {:?}", propagate); + self.send_gossip(daemon, propagate)?; + } + Ok(()) + } +}