automatic discovery of nodes on the same LAN (fix #2)

This commit is contained in:
Alex 2023-03-09 13:31:31 +01:00
parent 67280bd861
commit b7a1b4d454

View file

@ -21,6 +21,8 @@ const TIMEOUT: Duration = Duration::from_secs(300);
/// Interval at which to gossip last_seen info /// Interval at which to gossip last_seen info
const GOSSIP_INTERVAL: Duration = Duration::from_secs(300); const GOSSIP_INTERVAL: Duration = Duration::from_secs(300);
const LAN_BROADCAST_INTERVAL: Duration = Duration::from_secs(60);
const IGD_INTERVAL: Duration = Duration::from_secs(60); const IGD_INTERVAL: Duration = Duration::from_secs(60);
const IGD_LEASE_DURATION: Duration = Duration::from_secs(300); const IGD_LEASE_DURATION: Duration = Duration::from_secs(300);
@ -30,10 +32,16 @@ type Pubkey = String;
struct Config { struct Config {
/// The Wireguard interface name /// The Wireguard interface name
interface: Pubkey, interface: Pubkey,
/// Forward an external port to Wiregard using UPnP IGD
upnp_forward_external_port: Option<u16>,
/// The port to use for gossip inside the Wireguard mesh (must be the same on all nodes) /// The port to use for gossip inside the Wireguard mesh (must be the same on all nodes)
gossip_port: u16, gossip_port: u16,
/// Enable LAN discovery
#[serde(default)]
lan_discovery: bool,
/// Forward an external port to Wiregard using UPnP IGD
upnp_forward_external_port: Option<u16>,
/// The list of peers we try to connect to /// The list of peers we try to connect to
#[serde(default)] #[serde(default)]
peers: Vec<Peer>, peers: Vec<Peer>,
@ -134,6 +142,7 @@ struct Daemon {
struct PeerInfo { struct PeerInfo {
endpoint: Option<SocketAddr>, endpoint: Option<SocketAddr>,
lan_endpoint: Option<(SocketAddr, u64)>,
last_seen: u64, last_seen: u64,
gossip_ip: IpAddr, gossip_ip: IpAddr,
gossip_prio: u64, gossip_prio: u64,
@ -146,6 +155,10 @@ enum Gossip {
endpoints: Vec<(SocketAddr, u64)>, endpoints: Vec<(SocketAddr, u64)>,
}, },
Request, Request,
LanBroadcast {
pubkey: Pubkey,
listen_port: u16,
}
} }
impl Daemon { impl Daemon {
@ -180,6 +193,7 @@ impl Daemon {
thread::scope(|s| { thread::scope(|s| {
s.spawn(|| self.wg_loop()); s.spawn(|| self.wg_loop());
s.spawn(|| self.recv_loop()); s.spawn(|| self.recv_loop());
s.spawn(|| self.lan_broadcast_loop());
s.spawn(|| self.igd_loop()); s.spawn(|| self.igd_loop());
}); });
unreachable!() unreachable!()
@ -197,34 +211,10 @@ impl Daemon {
} }
fn wg_loop_iter(&self, i: usize) -> Result<()> { fn wg_loop_iter(&self, i: usize) -> Result<()> {
let (_, _, wg_peers) = wg_dump(&self.config)?;
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
// 1. Update local peers info of peers // 1. Update local peers info of peers
for (pk, endpoint, last_seen) in wg_peers { state.read_wg_peers(self)?;
match state.peers.get_mut(&pk) {
Some(i) => {
i.endpoint = endpoint;
i.last_seen = last_seen;
}
None => {
let gossip_ip = match self.config.peers.iter().find(|x| x.pubkey == pk) {
Some(x) => x.address,
None => continue,
};
let gossip_prio = fasthash(format!("{}-{}", self.our_pubkey, pk).as_bytes());
state.peers.insert(
pk,
PeerInfo {
endpoint,
gossip_prio,
gossip_ip,
last_seen,
},
);
}
}
}
// 2. Send gossip for peers where there is a big update // 2. Send gossip for peers where there is a big update
let announces = state let announces = state
@ -247,7 +237,7 @@ impl Daemon {
} }
// 3. Try new address for disconnected peers // 3. Try new address for disconnected peers
state.setup_wg_peers(&self, i)?; state.setup_wg_peers(self, i)?;
Ok(()) Ok(())
} }
@ -277,6 +267,13 @@ impl Daemon {
self.socket.send_to(&packet, from)?; self.socket.send_to(&packet, from)?;
} }
} }
Gossip::LanBroadcast{ pubkey, listen_port } => {
if self.config.lan_discovery {
if let Some(peer) = state.peers.get_mut(&pubkey) {
peer.lan_endpoint = Some((SocketAddr::new(from.ip(), listen_port), time()));
}
}
}
} }
Ok(()) Ok(())
} }
@ -292,6 +289,27 @@ impl Daemon {
Ok((src, gossip)) Ok((src, gossip))
} }
fn lan_broadcast_loop(&self) {
if self.config.lan_discovery {
loop {
if let Err(e) = self.lan_broadcast_iter() {
error!("LAN broadcast loop error: {}", e);
}
std::thread::sleep(LAN_BROADCAST_INTERVAL);
}
}
}
fn lan_broadcast_iter(&self) -> Result<()> {
let packet = bincode::serialize(&Gossip::LanBroadcast {
pubkey: self.our_pubkey.clone(),
listen_port: self.listen_port,
})?;
let addr = SocketAddr::new("255.255.255.255".parse().unwrap(), self.config.gossip_port);
self.socket.send_to(&packet, addr)?;
Ok(())
}
fn igd_loop(&self) { fn igd_loop(&self) {
if let Some(external_port) = self.config.upnp_forward_external_port { if let Some(external_port) = self.config.upnp_forward_external_port {
loop { loop {
@ -401,6 +419,7 @@ impl State {
} }
} }
None => { None => {
endpoints.sort_by_key(|(_, t)| -(*t as i64));
endpoints.truncate(KEEP_MAX_ADDRESSES); endpoints.truncate(KEEP_MAX_ADDRESSES);
self.gossip.insert(pubkey.clone(), endpoints.clone()); self.gossip.insert(pubkey.clone(), endpoints.clone());
Some(Gossip::Announce { pubkey, endpoints }) Some(Gossip::Announce { pubkey, endpoints })
@ -414,6 +433,37 @@ impl State {
Ok(()) Ok(())
} }
fn read_wg_peers(&mut self, daemon: &Daemon) -> Result<()> {
let (_, _, wg_peers) = wg_dump(&daemon.config)?;
for (pk, endpoint, last_seen) in wg_peers {
match self.peers.get_mut(&pk) {
Some(i) => {
i.endpoint = endpoint;
i.last_seen = last_seen;
}
None => {
let gossip_ip = match daemon.config.peers.iter().find(|x| x.pubkey == pk) {
Some(x) => x.address,
None => continue,
};
let gossip_prio = fasthash(format!("{}-{}", daemon.our_pubkey, pk).as_bytes());
self.peers.insert(
pk,
PeerInfo {
endpoint,
lan_endpoint: None,
gossip_prio,
gossip_ip,
last_seen,
},
);
}
}
}
Ok(())
}
fn setup_wg_peers(&self, daemon: &Daemon, i: usize) -> Result<()> { fn setup_wg_peers(&self, daemon: &Daemon, i: usize) -> Result<()> {
let now = time(); let now = time();
for peer in daemon.config.peers.iter() { for peer in daemon.config.peers.iter() {
@ -442,6 +492,13 @@ impl State {
} }
// For disconnected peers, cycle through the IP addresses that we know of // For disconnected peers, cycle through the IP addresses that we know of
let lan_endpoint = self.peers.get(&peer.pubkey)
.and_then(|peer| peer.lan_endpoint)
.filter(|(_, t)| time() < t + TIMEOUT.as_secs());
let endpoints = match lan_endpoint {
Some(endpoint) => vec![endpoint],
None => {
let mut endpoints = self.gossip.get(&peer.pubkey).cloned().unwrap_or_default(); let mut endpoints = self.gossip.get(&peer.pubkey).cloned().unwrap_or_default();
if let Some(endpoint) = &peer.endpoint { if let Some(endpoint) = &peer.endpoint {
match endpoint.to_socket_addrs() { match endpoint.to_socket_addrs() {
@ -454,6 +511,9 @@ impl State {
} }
} }
endpoints.sort(); endpoints.sort();
endpoints
}
};
if !endpoints.is_empty() { if !endpoints.is_empty() {
let endpoint = endpoints[i % endpoints.len()]; let endpoint = endpoints[i % endpoints.len()];
@ -486,6 +546,7 @@ impl State {
.output()?; .output()?;
} }
} }
Ok(()) Ok(())
} }
} }