From b7a1b4d4546aed80de38c54eb83ad940e21b0d84 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Mar 2023 13:31:31 +0100 Subject: [PATCH] automatic discovery of nodes on the same LAN (fix #2) --- src/main.rs | 135 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 98 insertions(+), 37 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6eb5171..de2e9c8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,6 +21,8 @@ const TIMEOUT: Duration = Duration::from_secs(300); /// Interval at which to gossip last_seen info const GOSSIP_INTERVAL: Duration = Duration::from_secs(300); +const LAN_BROADCAST_INTERVAL: Duration = Duration::from_secs(60); + const IGD_INTERVAL: Duration = Duration::from_secs(60); const IGD_LEASE_DURATION: Duration = Duration::from_secs(300); @@ -30,10 +32,16 @@ type Pubkey = String; struct Config { /// The Wireguard interface name interface: Pubkey, - /// Forward an external port to Wiregard using UPnP IGD - upnp_forward_external_port: Option, /// The port to use for gossip inside the Wireguard mesh (must be the same on all nodes) gossip_port: u16, + + /// Enable LAN discovery + #[serde(default)] + lan_discovery: bool, + + /// Forward an external port to Wiregard using UPnP IGD + upnp_forward_external_port: Option, + /// The list of peers we try to connect to #[serde(default)] peers: Vec, @@ -134,6 +142,7 @@ struct Daemon { struct PeerInfo { endpoint: Option, + lan_endpoint: Option<(SocketAddr, u64)>, last_seen: u64, gossip_ip: IpAddr, gossip_prio: u64, @@ -146,6 +155,10 @@ enum Gossip { endpoints: Vec<(SocketAddr, u64)>, }, Request, + LanBroadcast { + pubkey: Pubkey, + listen_port: u16, + } } impl Daemon { @@ -180,6 +193,7 @@ impl Daemon { thread::scope(|s| { s.spawn(|| self.wg_loop()); s.spawn(|| self.recv_loop()); + s.spawn(|| self.lan_broadcast_loop()); s.spawn(|| self.igd_loop()); }); unreachable!() @@ -197,34 +211,10 @@ impl Daemon { } fn wg_loop_iter(&self, i: usize) -> Result<()> { - let (_, _, wg_peers) = wg_dump(&self.config)?; let mut state = self.state.lock().unwrap(); // 1. Update local peers info of peers - for (pk, endpoint, last_seen) in wg_peers { - match state.peers.get_mut(&pk) { - Some(i) => { - i.endpoint = endpoint; - i.last_seen = last_seen; - } - None => { - let gossip_ip = match self.config.peers.iter().find(|x| x.pubkey == pk) { - Some(x) => x.address, - None => continue, - }; - let gossip_prio = fasthash(format!("{}-{}", self.our_pubkey, pk).as_bytes()); - state.peers.insert( - pk, - PeerInfo { - endpoint, - gossip_prio, - gossip_ip, - last_seen, - }, - ); - } - } - } + state.read_wg_peers(self)?; // 2. Send gossip for peers where there is a big update let announces = state @@ -247,7 +237,7 @@ impl Daemon { } // 3. Try new address for disconnected peers - state.setup_wg_peers(&self, i)?; + state.setup_wg_peers(self, i)?; Ok(()) } @@ -277,6 +267,13 @@ impl Daemon { self.socket.send_to(&packet, from)?; } } + Gossip::LanBroadcast{ pubkey, listen_port } => { + if self.config.lan_discovery { + if let Some(peer) = state.peers.get_mut(&pubkey) { + peer.lan_endpoint = Some((SocketAddr::new(from.ip(), listen_port), time())); + } + } + } } Ok(()) } @@ -292,6 +289,27 @@ impl Daemon { Ok((src, gossip)) } + fn lan_broadcast_loop(&self) { + if self.config.lan_discovery { + loop { + if let Err(e) = self.lan_broadcast_iter() { + error!("LAN broadcast loop error: {}", e); + } + std::thread::sleep(LAN_BROADCAST_INTERVAL); + } + } + } + + fn lan_broadcast_iter(&self) -> Result<()> { + let packet = bincode::serialize(&Gossip::LanBroadcast { + pubkey: self.our_pubkey.clone(), + listen_port: self.listen_port, + })?; + let addr = SocketAddr::new("255.255.255.255".parse().unwrap(), self.config.gossip_port); + self.socket.send_to(&packet, addr)?; + Ok(()) + } + fn igd_loop(&self) { if let Some(external_port) = self.config.upnp_forward_external_port { loop { @@ -401,6 +419,7 @@ impl State { } } None => { + endpoints.sort_by_key(|(_, t)| -(*t as i64)); endpoints.truncate(KEEP_MAX_ADDRESSES); self.gossip.insert(pubkey.clone(), endpoints.clone()); Some(Gossip::Announce { pubkey, endpoints }) @@ -414,6 +433,37 @@ impl State { Ok(()) } + fn read_wg_peers(&mut self, daemon: &Daemon) -> Result<()> { + let (_, _, wg_peers) = wg_dump(&daemon.config)?; + for (pk, endpoint, last_seen) in wg_peers { + match self.peers.get_mut(&pk) { + Some(i) => { + i.endpoint = endpoint; + i.last_seen = last_seen; + } + None => { + let gossip_ip = match daemon.config.peers.iter().find(|x| x.pubkey == pk) { + Some(x) => x.address, + None => continue, + }; + let gossip_prio = fasthash(format!("{}-{}", daemon.our_pubkey, pk).as_bytes()); + self.peers.insert( + pk, + PeerInfo { + endpoint, + lan_endpoint: None, + gossip_prio, + gossip_ip, + last_seen, + }, + ); + } + } + } + + Ok(()) + } + fn setup_wg_peers(&self, daemon: &Daemon, i: usize) -> Result<()> { let now = time(); for peer in daemon.config.peers.iter() { @@ -442,18 +492,28 @@ impl State { } // For disconnected peers, cycle through the IP addresses that we know of - let mut endpoints = self.gossip.get(&peer.pubkey).cloned().unwrap_or_default(); - if let Some(endpoint) = &peer.endpoint { - match endpoint.to_socket_addrs() { - Err(e) => error!("Could not resolve DNS for {}: {}", endpoint, e), - Ok(iter) => { - for addr in iter { - endpoints.push((addr, 0)); + let lan_endpoint = self.peers.get(&peer.pubkey) + .and_then(|peer| peer.lan_endpoint) + .filter(|(_, t)| time() < t + TIMEOUT.as_secs()); + + let endpoints = match lan_endpoint { + Some(endpoint) => vec![endpoint], + None => { + let mut endpoints = self.gossip.get(&peer.pubkey).cloned().unwrap_or_default(); + if let Some(endpoint) = &peer.endpoint { + match endpoint.to_socket_addrs() { + Err(e) => error!("Could not resolve DNS for {}: {}", endpoint, e), + Ok(iter) => { + for addr in iter { + endpoints.push((addr, 0)); + } + } } } + endpoints.sort(); + endpoints } - } - endpoints.sort(); + }; if !endpoints.is_empty() { let endpoint = endpoints[i % endpoints.len()]; @@ -486,6 +546,7 @@ impl State { .output()?; } } + Ok(()) } }