Compare commits

...

10 commits

Author SHA1 Message Date
Lyn
53b7a6b6ed made IGD implementation compatible with IPv6 implementation, updated documentation for config structure changes 2025-01-14 15:07:07 +01:00
Lyn
69c175059c int overflow no. 2 fixed 2025-01-14 13:55:22 +01:00
Lyn
464188f945 integer overflow fix 2025-01-13 16:44:47 +01:00
Lyn
3943972be3 Merge remote-tracking branch 'yuka/main'
# Conflicts:
#	Cargo.lock
#	Cargo.toml
#	src/main.rs
2025-01-11 13:41:56 +01:00
Lyn
f2ca3b6e6c added IPv6 support; rewrote the igd implementation 2025-01-11 12:33:36 +01:00
Yureka
e039471e82 update cargo dependencies 2024-05-22 12:01:02 +02:00
Yureka
503f94ceeb fix 2023-11-18 19:14:06 +01:00
Yureka
d33e26d5ce fix 2023-11-18 18:55:10 +01:00
Yureka
1680284107 set ttl 2023-11-18 17:41:40 +01:00
Yureka
ff581dff6f support multiple interfaces 2023-11-18 16:26:09 +01:00
9 changed files with 2152 additions and 840 deletions

811
Cargo.lock generated

File diff suppressed because it is too large Load diff

1604
Cargo.nix

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[package] [package]
name = "wgautomesh" name = "wgautomesh"
version = "0.1.0" version = "0.1.1"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -11,11 +11,12 @@ anyhow = "1.0"
log = "0.4" log = "0.4"
pretty_env_logger = "0.5" pretty_env_logger = "0.5"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0.215", features = ["derive"] }
bincode = "1.3" bincode = "1.3"
toml = { version = "0.8", default-features = false, features = ["parse"] } toml = { version = "0.8", default-features = false, features = ["parse"] }
xsalsa20poly1305 = "0.9" xsalsa20poly1305 = "0.9"
blake3 = "1.5" blake3 = "1.5"
pnet = "0.35.0"
igd = { version = "0.12", default-features = false } rupnp = "2.0.0"
get_if_addrs = "0.5" tokio = { version = "1.41.1", features = ["rt", "rt-multi-thread", "macros"] }
futures = "0.3.31"

View file

@ -32,9 +32,6 @@ and closely mirror the structure of the configuration file described below.
### Sample configuration file ### Sample configuration file
```toml ```toml
# The Wireguard interface to control.
interface = "wg0"
# The port wgautomesh will use to communicate from node to node. Wgautomesh # The port wgautomesh will use to communicate from node to node. Wgautomesh
# gossip communications occur inside the wireguard mesh network. # gossip communications occur inside the wireguard mesh network.
gossip_port = 1666 gossip_port = 1666
@ -42,10 +39,6 @@ gossip_port = 1666
# Enable discovery of other wgautomesh nodes on the same LAN using UDP broadcast. # Enable discovery of other wgautomesh nodes on the same LAN using UDP broadcast.
lan_discovery = true lan_discovery = true
# Enables UPnP/IGD forwarding of an external port to the Wireguard listening port
# on this node, for compatible routers/gateways.
upnp_forward_external_port = 33723
# The path to a file that contains the encryption secret wgautomesh uses to # The path to a file that contains the encryption secret wgautomesh uses to
# communicate. This secret can be any arbitrary utf-8 string. The following # communicate. This secret can be any arbitrary utf-8 string. The following
# command can be used to generate a new secret: # command can be used to generate a new secret:
@ -58,27 +51,51 @@ gossip_secret_file = "/var/lib/wgautomesh/gossip_secret"
# `[[peers]]` section when trying to establish connectivity. # `[[peers]]` section when trying to establish connectivity.
persist_file = "/var/lib/wgautomesh/state" persist_file = "/var/lib/wgautomesh/state"
[[peers]] # Configuration for a wireguard interface
pubkey = "7Nm7pMmyS7Nts1MB+loyD8u84ODxHPTkDu+uqQR6yDk=" [[interfaces]]
address = "10.14.1.2" # Interface name
endpoint = "77.207.15.215:33722" name = "wg0"
# Enables UPnP/IGD forwarding of an external port to the Wireguard listening port
# on this interface, for compatible routers/gateways.
# For IPv6 only interfaces, just assign any port here to enable IGD since the Wireguard
# interfaces listen port will always be pinholed instead.
upnp_forward_external_port = 51820
[[peers]] [[peers]]
# The Wireguard interface to use to connect with this peer.
interface = "wg0"
# Pubkey of the other peer
pubkey = "7Nm7pMmyS7Nts1MB+loyD8u84ODxHPTkDu+uqQR6yDk="
# Mesh-internal wireguard address
address = "10.14.1.2"
# (Optional) endpoint address of this peer
endpoint = "77.207.15.215"
# (Optional) endpoint port
port = 33722
[[peers]]
interface = "wg0"
pubkey = "lABn/axzD1jkFulX8c+K3B3CbKXORlIMDDoe8sQVxhs=" pubkey = "lABn/axzD1jkFulX8c+K3B3CbKXORlIMDDoe8sQVxhs="
address = "10.14.1.3" address = "10.14.1.3"
endpoint = "77.207.15.215:33723" endpoint = "77.207.15.215"
port = 33723
[[peers]] [[peers]]
interface = "wg0"
pubkey = "XLOYoMXF+PO4jcgfSVAk+thh4VmWx0wzWnb0xs08G1s=" pubkey = "XLOYoMXF+PO4jcgfSVAk+thh4VmWx0wzWnb0xs08G1s="
address = "10.14.4.1" address = "10.14.4.1"
endpoint = "bitfrost.fiber.shirokumo.net:33734" endpoint = "bitfrost.fiber.shirokumo.net"
port = 33734
[[peers]] [[peers]]
interface = "wg0"
pubkey = "smBQYUS60JDkNoqkTT7TgbpqFiM43005fcrT6472llI=" pubkey = "smBQYUS60JDkNoqkTT7TgbpqFiM43005fcrT6472llI="
address = "10.14.2.33" address = "10.14.2.33"
endpoint = "82.64.238.84:33733" endpoint = "82.64.238.84"
port = 33733
[[peers]] [[peers]]
interface = "wg0"
pubkey = "m9rLf+233X1VColmeVrM/xfDGro5W6Gk5N0zqcf32WY=" pubkey = "m9rLf+233X1VColmeVrM/xfDGro5W6Gk5N0zqcf32WY="
address = "10.14.3.1" address = "10.14.3.1"
``` ```

View file

@ -1,2 +1,3 @@
interface = "route48"
gossip_port = 1134 gossip_port = 1134
[[interfaces]]
name = "wg0"

2
rust-toolchain.toml Normal file
View file

@ -0,0 +1,2 @@
[toolchain]
channel = "nightly"

219
src/igd.rs Normal file
View file

@ -0,0 +1,219 @@
use std::collections::HashMap;
use log::*;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use futures::stream::{StreamExt};
use rupnp::{Device, Service};
use rupnp::ssdp::{SearchTarget, URN};
use std::str::FromStr;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result, Error};
const IGD_LEASE_DURATION: Duration = Duration::from_secs(300);
const WAN_IPV6_FIREWALL_CONTROL: URN = URN::service("schemas-upnp-org", "WANIPv6FirewallControl", 1);
const WAN_IP_CONNECTION: URN = URN::service("schemas-upnp-org", "WANIPConnection", 1);
pub async fn igd_loop_iter(portmap_v4:HashMap<u16,u16>,portmap_v6: Vec<u16>) -> Result<()> {
let lease_duration: u64 = IGD_LEASE_DURATION.as_secs();
//find gateway compatible with publishing the port for required IP version
let gateway = find_gateway(portmap_v6.len() >0).await?;
let gateway_ip: IpAddr = IpAddr::from_str(gateway.url().host().unwrap())?;
//find corresponding interface local IP to forward in gateway
let (local_ipv4, local_ipv6) = select_local_ip_for_gateway(gateway_ip)?;
debug!(
"Found gateway: {:?} at IP {}, making announce",
gateway.friendly_name(),
gateway.url().host().unwrap()
);
if local_ipv6.is_some() {
for listen_port in &portmap_v6 {
create_ipv6_firewall_pinhole(
&gateway,
&IpAddr::V6(local_ipv6.unwrap()),
listen_port,
&lease_duration,
)
.await?
}
}
if local_ipv4.is_some() {
for (listen_port, external_port) in &portmap_v4 {
create_ipv4_port_mapping(
&gateway,
&IpAddr::V4(local_ipv4.unwrap()),
listen_port,
external_port,
&lease_duration,
)
.await?
}
}
Ok(())
}
/// Create a port mapping to forward a given Port for a given internal IPv4
async fn create_ipv4_port_mapping(
gateway: &Device,
internal_ip: &IpAddr,
listen_port: &u16,
external_port: &u16,
lease_duration: &u64,
) -> Result<()> {
let wan_ip_con_service = gateway
.find_service(&WAN_IP_CONNECTION)
.expect("Gateway passed doesn't offer the required service to create IPv4 port mapping");
let arguments = format!(
"<NewRemoteHost/>
<NewExternalPort>{external_port}</NewExternalPort>
<NewProtocol>UDP</NewProtocol>
<NewInternalPort>{listen_port}</NewInternalPort>
<NewInternalClient>{internal_ip}</NewInternalClient>
<NewEnabled>1</NewEnabled>
<NewPortMappingDescription>Wireguard via wgautomesh</NewPortMappingDescription>
<NewLeaseDuration>{lease_duration}</NewLeaseDuration>"
);
debug!(
"Adding port mapping for internal IP {} on internal port {}, external port {}",
internal_ip, listen_port, external_port,
);
let result = wan_ip_con_service
.action(gateway.url(), "AddPortMapping", &arguments)
.await;
if result.is_ok() {
Ok(())
} else {
bail!(
"Error trying to add IPv4 port mapping: {}.\
Note: Have you checked whether your router allows this device to create (IPv4) port mappings?",
result.err().unwrap()
);
}
}
/// Create a pinhole for a given IPv6 address on a given port
async fn create_ipv6_firewall_pinhole(gateway: &Device,
ip: &IpAddr,
listen_port: &u16,
lease_duration: &u64,
) -> Result<()> {
let wan_ip6_fw_con = gateway
.find_service(&WAN_IPV6_FIREWALL_CONTROL)
.expect("Gateway passed doesn't offer the required service to create IPv6 pinholes");
let (firewall_enabled, can_create_inbound_pinhole) =
get_firewall_status(&gateway, &wan_ip6_fw_con).await;
if !firewall_enabled {
debug!("Gateway firewall is not enabled, incoming connections should be allowed as-is on all ports");
return Ok(());
} else if !can_create_inbound_pinhole {
bail!("Gateway said creating inbound IPv6 pinholes isn't allowed")
}
let arguments = format!(
"<RemoteHost/>
<RemotePort/>
<Protocol>17</Protocol>
<InternalPort>{listen_port}</InternalPort>
<InternalClient>{ip}</InternalClient>
<LeaseTime>{lease_duration}</LeaseTime>"
);
debug!(
"Opening firewall pinhole for IP {} on port {}",
ip, listen_port,
);
let result = wan_ip6_fw_con
.action(gateway.url(), "AddPinhole", &arguments)
.await;
if result.is_ok() {
Ok(())
} else {
bail!(
"Error trying to open IPv6 pinhole: {}\
Note: Have you checked whether your router allows this device to create (IPv6) pinholes?",
result.err().unwrap()
);
}
}
/// Asks the Gateway for the IPv6 Firewall status (whether the firewall is enabled AND whether devices in this network are allowed to create IPv6 pinholes per policy).
/// Note: This only works on IGDv2 supporting firewalls (-> any firewall that can do IPv6)
async fn get_firewall_status(gateway: &Device, igd_service: &Service) -> (bool, bool) {
let firewall_status_response = igd_service
.action(gateway.url(), "GetFirewallStatus", "")
.await
.unwrap();
let firewall_enabled: bool =
(u32::from_str(firewall_status_response.get("FirewallEnabled").unwrap()).unwrap()) != 0;
let can_create_inbound_pinhole: bool = u32::from_str(
firewall_status_response
.get("InboundPinholeAllowed")
.unwrap(),
)
.unwrap()
!= 0;
(firewall_enabled, can_create_inbound_pinhole)
}
/// Find a Gateway compatible with either IPv4 (supports WANIPConnection) or IPv6 (supports WANIPv6FirewallControl)
async fn find_gateway(ipv6_required: bool) -> Result<Device, Error> {
let search_urn: URN = if ipv6_required {
WAN_IPV6_FIREWALL_CONTROL
} else {
WAN_IP_CONNECTION
};
let discovered_devices = rupnp::discover(
&SearchTarget::URN(search_urn.clone()),
Duration::from_secs(3),
)
.await?
.filter_map(|result| async {
match result {
Ok(device) => Some(device),
Err(_) => None,
}
});
futures::pin_mut!(discovered_devices);
discovered_devices.next().await.ok_or_else(||anyhow!("Couldn't find any gateways supporting {}. Is port 1900 open for incoming connections from local networks?", search_urn.typ()))
}
/// Returns a list of IPs assigned to interfaces that are in the same subnet as a given IP
fn interface_ips_in_same_subnet_as(ip_to_match: IpAddr) -> Result<Vec<IpAddr>, Error> {
let interfaces = pnet::datalink::interfaces();
let ipnets = interfaces
.iter()
.filter_map(|interface| {
if interface
.ips
.iter()
.any(|ipnetwork| ipnetwork.contains(ip_to_match))
{
Some(interface.ips.clone())
} else {
None
}
})
.next()
.context("Couldn't find any local IPs within the same network as given IP")?;
let ips = ipnets.iter().map(|ip| ip.ip()).collect();
Ok(ips)
}
/// Selects the local IP we tell the Gateway to port forward to (/pinhole) later on
/// Note: As soon as this[https://github.com/jakobhellermann/ssdp-client/issues/11] is fixed and the dependency is upgraded in rupnp, we can simplify it
fn select_local_ip_for_gateway(gateway: IpAddr) -> Result<(Option<Ipv4Addr>,Option<Ipv6Addr>), Error> {
//get IPs in same subnet as Gateway
let ips = interface_ips_in_same_subnet_as(gateway)?;
//removes IPv6 that are not globally routable as pinholing those would be pointless
let v6_cleaned_up = ips
.iter()
.filter(|ip| ip.is_ipv4() || (ip.is_global() && ip.is_ipv6()));
//get the first IPv4 and IPv6 found
let mut first_ipv4 :Option<Ipv4Addr> = None;
let mut first_ipv6 :Option<Ipv6Addr> = None;
for ip in v6_cleaned_up {
match ip {
IpAddr::V4(ipv4) => { if first_ipv4.is_none() { first_ipv4 = Some(ipv4.clone()); } }
IpAddr::V6(ipv6) => { if first_ipv6.is_none() { first_ipv6 = Some(ipv6.clone()); } }
}
if first_ipv4.is_some() && first_ipv6.is_some() {
break;
}
}
if first_ipv4.is_none() && first_ipv6.is_none() {
bail!("Couldn't find any IP address")
}
Ok((first_ipv4, first_ipv6))
}

View file

@ -1,5 +1,8 @@
use std::collections::HashMap; #![feature(ip)]
use std::net::{IpAddr, SocketAddr, SocketAddrV4, ToSocketAddrs, UdpSocket}; mod igd;
use igd::*;
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, SocketAddr, ToSocketAddrs, UdpSocket};
use std::process::Command; use std::process::Command;
use std::sync::Mutex; use std::sync::Mutex;
use std::thread; use std::thread;
@ -26,14 +29,10 @@ const PERSIST_INTERVAL: Duration = Duration::from_secs(600);
const LAN_BROADCAST_INTERVAL: Duration = Duration::from_secs(60); const LAN_BROADCAST_INTERVAL: Duration = Duration::from_secs(60);
const IGD_INTERVAL: Duration = Duration::from_secs(60); const IGD_INTERVAL: Duration = Duration::from_secs(60);
const IGD_LEASE_DURATION: Duration = Duration::from_secs(300);
type Pubkey = String; type Pubkey = String;
#[derive(Deserialize)] #[derive(Deserialize)]
struct Config { struct Config {
/// The Wireguard interface name
interface: Pubkey,
/// The port to use for gossip inside the Wireguard mesh (must be the same on all nodes) /// The port to use for gossip inside the Wireguard mesh (must be the same on all nodes)
gossip_port: u16, gossip_port: u16,
/// The secret to use to authenticate nodes between them /// The secret to use to authenticate nodes between them
@ -46,23 +45,36 @@ struct Config {
#[serde(default)] #[serde(default)]
lan_discovery: bool, lan_discovery: bool,
/// Forward an external port to Wiregard using UPnP IGD
upnp_forward_external_port: Option<u16>,
/// The list of peers we try to connect to /// The list of peers we try to connect to
#[serde(default)] #[serde(default)]
peers: Vec<Peer>, peers: Vec<Peer>,
/// Settings for the Wireguard interfaces (currently only necessary if you want to use igd features)
#[serde(default)]
interfaces: Vec<InterfaceSetting>,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
struct Peer { struct Peer {
/// The peer's Wireguard public key /// The peer's Wireguard public key
pubkey: Pubkey, pubkey: Pubkey,
/// The peer's Wireguard address /// The destination used for gossip packets
address: IpAddr, address: IpAddr,
/// An optionnal Wireguard endpoint used to initialize a connection to this peer /// The Wireguard interface name used to communicate with this peer
interface: String,
/// The endpoint port
port: Option<u16>,
/// An optional Wireguard endpoint used to initialize a connection to this peer
endpoint: Option<String>, endpoint: Option<String>,
} }
/// Settings for Wireguard interfaces
#[derive(Deserialize)]
struct InterfaceSetting {
/// The Wireguard interface name
name: String,
/// Forward an external port to Wiregard using UPnP IGD.
upnp_forward_external_port: Option<u16>,
}
fn main() -> Result<()> { fn main() -> Result<()> {
pretty_env_logger::init(); pretty_env_logger::init();
@ -115,9 +127,15 @@ fn kdf(secret: &str) -> xsalsa20poly1305::Key {
hash.as_bytes().clone().into() hash.as_bytes().clone().into()
} }
fn wg_dump(config: &Config) -> Result<(Pubkey, u16, Vec<(Pubkey, Option<SocketAddr>, u64)>)> { struct IfInfo {
our_pubkey: Pubkey,
listen_port: u16,
peers: Vec<(Pubkey, Option<SocketAddr>, u64)>,
}
fn wg_dump(interface: &str) -> Result<IfInfo> {
let output = Command::new("wg") let output = Command::new("wg")
.args(["show", &config.interface, "dump"]) .args(["show", interface, "dump"])
.output()?; .output()?;
let mut lines = std::str::from_utf8(&output.stdout)?.split('\n'); let mut lines = std::str::from_utf8(&output.stdout)?.split('\n');
@ -125,7 +143,7 @@ fn wg_dump(config: &Config) -> Result<(Pubkey, u16, Vec<(Pubkey, Option<SocketAd
if ourself.len() < 3 { if ourself.len() < 3 {
bail!( bail!(
"Unable to fetch wireguard status for interface {}", "Unable to fetch wireguard status for interface {}",
config.interface interface
); );
} }
let our_pubkey = ourself[1].to_string(); let our_pubkey = ourself[1].to_string();
@ -146,7 +164,7 @@ fn wg_dump(config: &Config) -> Result<(Pubkey, u16, Vec<(Pubkey, Option<SocketAd
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok((our_pubkey, listen_port, peers)) Ok(IfInfo { our_pubkey, listen_port, peers })
} }
// ============ DAEMON CODE ================= // ============ DAEMON CODE =================
@ -154,10 +172,9 @@ fn wg_dump(config: &Config) -> Result<(Pubkey, u16, Vec<(Pubkey, Option<SocketAd
struct Daemon { struct Daemon {
config: Config, config: Config,
gossip_key: xsalsa20poly1305::Key, gossip_key: xsalsa20poly1305::Key,
our_pubkey: Pubkey,
listen_port: u16,
socket: UdpSocket, socket: UdpSocket,
state: Mutex<State>, state: Mutex<State>,
our_pubkey: Pubkey,
} }
struct PeerInfo { struct PeerInfo {
@ -165,10 +182,10 @@ struct PeerInfo {
gossip_ip: IpAddr, gossip_ip: IpAddr,
gossip_prio: u64, gossip_prio: u64,
// Info retrieved from wireguard // Info retrieved from wireguard
endpoint: Option<SocketAddr>, endpoint: Option<IpAddr>,
last_seen: u64, last_seen: u64,
// Info received by LAN broadcast // Info received by LAN broadcast
lan_endpoint: Option<(SocketAddr, u64)>, lan_endpoint: Option<(IpAddr, u64)>,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -177,12 +194,11 @@ enum Gossip {
Pong, Pong,
Announce { Announce {
pubkey: Pubkey, pubkey: Pubkey,
endpoints: Vec<(SocketAddr, u64)>, endpoints: Vec<(IpAddr, u64)>,
}, },
Request, Request,
LanBroadcast { LanBroadcast {
pubkey: Pubkey, pubkey: Pubkey,
listen_port: u16,
}, },
} }
@ -190,18 +206,35 @@ impl Daemon {
fn new(config: Config) -> Result<Self> { fn new(config: Config) -> Result<Self> {
let gossip_key = kdf(config.gossip_secret.as_deref().unwrap_or_default()); let gossip_key = kdf(config.gossip_secret.as_deref().unwrap_or_default());
let (our_pubkey, listen_port, _peers) = wg_dump(&config)?; let interface_names = config.peers.iter().map(|peer| peer.interface.clone()).collect::<HashSet<_>>();
let socket = UdpSocket::bind(SocketAddr::new("0.0.0.0".parse()?, config.gossip_port))?; let interfaces = interface_names.into_iter().map(|interface_name| wg_dump(&interface_name).map(|ifinfo| (interface_name, ifinfo))).collect::<Result<HashMap<_, _>>>()?;
let socket = UdpSocket::bind(SocketAddr::new("::".parse()?, config.gossip_port))?;
socket.set_broadcast(true)?; socket.set_broadcast(true)?;
socket.set_ttl(1)?;
let our_pubkey = interfaces.iter().next().unwrap().1.our_pubkey.clone();
let peers = config.peers.iter().map(|peer_cfg| {
(
peer_cfg.pubkey.clone(),
PeerInfo {
gossip_ip: peer_cfg.address,
gossip_prio: fasthash(format!("{}-{}", our_pubkey, peer_cfg.pubkey).as_bytes()),
endpoint: None, // Is resolved as DNS name later
last_seen: u64::MAX,
lan_endpoint: None,
}
)
}).collect();
Ok(Daemon { Ok(Daemon {
config, config,
gossip_key, gossip_key,
our_pubkey,
listen_port,
socket, socket,
our_pubkey,
state: Mutex::new(State { state: Mutex::new(State {
peers: HashMap::new(), interfaces,
peers,
gossip: HashMap::new(), gossip: HashMap::new(),
}), }),
}) })
@ -236,7 +269,7 @@ impl Daemon {
fn initialize(&self) -> Result<()> { fn initialize(&self) -> Result<()> {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.read_wg_peers(self)?; state.read_wg_peers()?;
state.setup_wg_peers(self, 0)?; state.setup_wg_peers(self, 0)?;
Ok(()) Ok(())
} }
@ -259,7 +292,7 @@ impl Daemon {
error!("Wireguard configuration loop error: {}", e); error!("Wireguard configuration loop error: {}", e);
} }
i = i + 1; i = i + 1;
std::thread::sleep(TRY_INTERVAL); thread::sleep(TRY_INTERVAL);
} }
} }
@ -267,7 +300,7 @@ impl Daemon {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
// 1. Update local peers info of peers // 1. Update local peers info of peers
state.read_wg_peers(self)?; state.read_wg_peers()?;
// 2. Send gossip for peers where there is a big update // 2. Send gossip for peers where there is a big update
let announces = state let announces = state
@ -299,7 +332,7 @@ impl Daemon {
loop { loop {
if let Err(e) = self.recv_loop_iter() { if let Err(e) = self.recv_loop_iter() {
error!("Receive loop error: {}", e); error!("Receive loop error: {}", e);
std::thread::sleep(Duration::from_secs(10)); thread::sleep(Duration::from_secs(10));
} }
} }
} }
@ -322,12 +355,10 @@ impl Daemon {
} }
Gossip::LanBroadcast { Gossip::LanBroadcast {
pubkey, pubkey,
listen_port,
} => { } => {
if self.config.lan_discovery { if self.config.lan_discovery {
if let Some(peer) = state.peers.get_mut(&pubkey) { if let Some(peer) = state.peers.get_mut(&pubkey) {
let addr = SocketAddr::new(from.ip(), listen_port); peer.lan_endpoint = Some((from.ip(), time()));
peer.lan_endpoint = Some((addr, time()));
} }
} }
} }
@ -369,7 +400,7 @@ impl Daemon {
if let Err(e) = self.persist_state(file) { if let Err(e) = self.persist_state(file) {
error!("Could not write persistent state to disk: {}", e); error!("Could not write persistent state to disk: {}", e);
} }
std::thread::sleep(PERSIST_INTERVAL); thread::sleep(PERSIST_INTERVAL);
} }
} }
} }
@ -386,7 +417,7 @@ impl Daemon {
if let Err(e) = self.lan_broadcast_iter() { if let Err(e) = self.lan_broadcast_iter() {
error!("LAN broadcast loop error: {}", e); error!("LAN broadcast loop error: {}", e);
} }
std::thread::sleep(LAN_BROADCAST_INTERVAL); thread::sleep(LAN_BROADCAST_INTERVAL);
} }
} }
} }
@ -394,59 +425,76 @@ impl Daemon {
fn lan_broadcast_iter(&self) -> Result<()> { fn lan_broadcast_iter(&self) -> Result<()> {
let packet = self.make_packet(&Gossip::LanBroadcast { let packet = self.make_packet(&Gossip::LanBroadcast {
pubkey: self.our_pubkey.clone(), pubkey: self.our_pubkey.clone(),
listen_port: self.listen_port,
})?; })?;
let addr = SocketAddr::new("255.255.255.255".parse().unwrap(), self.config.gossip_port); let broadcast_addr = SocketAddr::new("255.255.255.255".parse()?, self.config.gossip_port);
self.socket.send_to(&packet, addr)?; let broadcast_addrv6 = SocketAddr::new("ff05::1".parse()?, self.config.gossip_port);
self.socket.send_to(&packet, broadcast_addr)?;
self.socket.send_to(&packet, broadcast_addrv6)?;
Ok(()) Ok(())
} }
fn igd_loop(&self) { fn igd_loop(&self) {
if let Some(external_port) = self.config.upnp_forward_external_port { if self.config.interfaces.iter().any(|interface| interface.upnp_forward_external_port.is_some()) {
let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
rt.block_on(async {
loop { loop {
if let Err(e) = self.igd_loop_iter(external_port) { let (portmap_v4, portmap_v6) = self.generate_portmaps();
if !portmap_v4.is_empty() || !portmap_v6.is_empty() {
if let Err(e) = igd_loop_iter(portmap_v4, portmap_v6).await {
error!("IGD loop error: {}", e); error!("IGD loop error: {}", e);
} }
std::thread::sleep(IGD_INTERVAL); }
} tokio::time::sleep(IGD_INTERVAL).await;
}
});
} }
} }
fn generate_portmaps(&self) -> (HashMap<u16,u16>, Vec<u16>) {
let mut portmap_v4: HashMap<u16, u16> = HashMap::new();
let mut v6_internal_ports: Vec<u16> = Vec::new();
fn igd_loop_iter(&self, external_port: u16) -> Result<()> { //collect interfaces that have peers with IPv4 addresses (-> IPv4 port mappings need to exist)
let gateway = igd::search_gateway(Default::default())?; let binding = self.state.lock().unwrap();
let interfaces = binding.interfaces.iter().clone();
let gwa = gateway.addr.ip().octets(); let v4ifs: Vec<_> = interfaces.clone()
let cmplen = match gwa { .filter(|(ifname, ifinfo)| {
[192, 168, _, _] => 3, ifinfo.listen_port!=0
[10, _, _, _] => 2, && self.config
_ => bail!( .peers
"Gateway IP does not appear to be in a local network ({})", .iter()
gateway.addr.ip() .any(|peer| peer.interface == **ifname && peer.address.is_ipv4())
),
};
let private_ip = get_if_addrs::get_if_addrs()?
.into_iter()
.map(|i| i.addr.ip())
.filter_map(|a| match a {
std::net::IpAddr::V4(a4) if a4.octets()[..cmplen] == gwa[..cmplen] => Some(a4),
_ => None,
}) })
.next() .collect();
.ok_or(anyhow!("No interface has an IP on same subnet as gateway"))?;
debug!(
"IGD: gateway is {}, private IP is {}, making announce",
gateway.addr, private_ip
);
gateway.add_port( //collect interfaces that have peers with IPv6 addresses (-> pinholes for the IPv6 listen ports should be created)
igd::PortMappingProtocol::UDP, let v6ifs: Vec<_> = interfaces
external_port, .filter(|(ifname, ifinfo)| {
SocketAddrV4::new(private_ip, self.listen_port), ifinfo.listen_port!=0
IGD_LEASE_DURATION.as_secs() as u32, && self.config
"Wireguard via wgautomesh", .peers
)?; .iter()
.any(|peer| peer.interface == **ifname && peer.address.is_ipv6())
})
.collect();
for ifsetting in &self.config.interfaces {
if let Some(external_port) = ifsetting.upnp_forward_external_port {
if let Some((_ifname, ifinfo)) = v4ifs
.iter()
.find(|(ifname, _)| **ifname == ifsetting.name)
{
portmap_v4.insert(ifinfo.listen_port, external_port);
}
if let Some((_ifname, ifinfo)) = v6ifs
.iter()
.find(|(ifname, _)| **ifname == ifsetting.name)
{
v6_internal_ports.push(ifinfo.listen_port);
}
}
}
(portmap_v4, v6_internal_ports)
Ok(())
} }
fn make_packet(&self, gossip: &Gossip) -> Result<Vec<u8>> { fn make_packet(&self, gossip: &Gossip) -> Result<Vec<u8>> {
@ -469,7 +517,8 @@ impl Daemon {
struct State { struct State {
peers: HashMap<Pubkey, PeerInfo>, peers: HashMap<Pubkey, PeerInfo>,
gossip: HashMap<Pubkey, Vec<(SocketAddr, u64)>>, gossip: HashMap<Pubkey, Vec<(IpAddr, u64)>>,
interfaces: HashMap<String, IfInfo>,
} }
impl State { impl State {
@ -481,7 +530,7 @@ impl State {
let mut peer_vec = self let mut peer_vec = self
.peers .peers
.iter() .iter()
.filter(|(_, info)| now < info.last_seen + TIMEOUT.as_secs() && info.endpoint.is_some()) .filter(|(_, info)| info.last_seen != u64::MAX && now < info.last_seen + TIMEOUT.as_secs() && info.endpoint.is_some())
.map(|(_, info)| (info.gossip_ip, info.gossip_prio)) .map(|(_, info)| (info.gossip_ip, info.gossip_prio))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
peer_vec.sort_by_key(|(_, prio)| *prio); peer_vec.sort_by_key(|(_, prio)| *prio);
@ -499,7 +548,7 @@ impl State {
&mut self, &mut self,
daemon: &Daemon, daemon: &Daemon,
pubkey: Pubkey, pubkey: Pubkey,
mut endpoints: Vec<(SocketAddr, u64)>, mut endpoints: Vec<(IpAddr, u64)>,
) -> Result<()> { ) -> Result<()> {
let propagate = { let propagate = {
match self.gossip.get_mut(&pubkey) { match self.gossip.get_mut(&pubkey) {
@ -541,36 +590,21 @@ impl State {
Ok(()) Ok(())
} }
fn read_wg_peers(&mut self, daemon: &Daemon) -> Result<()> { fn read_wg_peers(&mut self) -> Result<()> {
let (_, _, wg_peers) = wg_dump(&daemon.config)?;
// Clear old known endpoints if any // Clear old known endpoints if any
for (_, peer) in self.peers.iter_mut() { for (_, peer) in self.peers.iter_mut() {
peer.endpoint = None; peer.endpoint = None;
} }
for (pk, endpoint, last_seen) in wg_peers { for (ifname, ifinfo) in &mut self.interfaces {
match self.peers.get_mut(&pk) { *ifinfo = wg_dump(&ifname)?;
Some(i) => { for (pk, endpoint, last_seen) in &mut ifinfo.peers {
i.endpoint = endpoint; if let Some(i) = self.peers.get_mut(&*pk) {
i.last_seen = last_seen; i.endpoint = endpoint.as_ref().map(SocketAddr::ip);
} i.last_seen = *last_seen;
None => { } else {
let gossip_ip = match daemon.config.peers.iter().find(|x| x.pubkey == pk) { warn!("unknown peer: {}", pk);
Some(x) => x.address,
None => continue,
};
let gossip_prio = fasthash(format!("{}-{}", daemon.our_pubkey, pk).as_bytes());
self.peers.insert(
pk,
PeerInfo {
endpoint,
lan_endpoint: None,
gossip_prio,
gossip_ip,
last_seen,
},
);
} }
} }
} }
@ -601,11 +635,11 @@ impl State {
// if peer is connected and endpoint is the correct one, // if peer is connected and endpoint is the correct one,
// set higher keepalive and then skip reconfiguring it // set higher keepalive and then skip reconfiguring it
if !bad_endpoint && now < peer.last_seen + TIMEOUT.as_secs() { if !bad_endpoint && peer.last_seen != u64::MAX && now < peer.last_seen + TIMEOUT.as_secs() {
Command::new("wg") Command::new("wg")
.args([ .args([
"set", "set",
&daemon.config.interface, &peer_cfg.interface,
"peer", "peer",
&peer_cfg.pubkey, &peer_cfg.pubkey,
"persistent-keepalive", "persistent-keepalive",
@ -631,11 +665,11 @@ impl State {
.cloned() .cloned()
.unwrap_or_default(); .unwrap_or_default();
if let Some(endpoint) = &peer_cfg.endpoint { if let Some(endpoint) = &peer_cfg.endpoint {
match endpoint.to_socket_addrs() { match format!("{}:0", endpoint).to_socket_addrs() {
Err(e) => error!("Could not resolve DNS for {}: {}", endpoint, e), Err(e) => error!("Could not resolve DNS for {}: {}", endpoint, e),
Ok(iter) => { Ok(iter) => {
for addr in iter { for addr in iter {
endpoints.push((addr, 0)); endpoints.push((addr.ip(), 0));
} }
} }
} }
@ -657,20 +691,20 @@ impl State {
// Skip if we are already using that endpoint // Skip if we are already using that endpoint
continue; continue;
} }
if let Some(port) = peer_cfg.port {
info!("Configure {} with endpoint {}", peer_cfg.pubkey, endpoint); info!("Configure {} with endpoint {}", peer_cfg.pubkey, endpoint);
Command::new("wg") Command::new("wg")
.args([ .args([
"set", "set",
&daemon.config.interface, &peer_cfg.interface,
"peer", "peer",
&peer_cfg.pubkey, &peer_cfg.pubkey,
"endpoint", "endpoint",
&endpoint.to_string(), &SocketAddr::new(endpoint, port).to_string(),
"persistent-keepalive", "persistent-keepalive",
"10", "10",
"allowed-ips", "allowed-ips",
&format!("{}/32", peer_cfg.address), "::/0,0.0.0.0/0"
]) ])
.output()?; .output()?;
let packet = daemon.make_packet(&Gossip::Ping)?; let packet = daemon.make_packet(&Gossip::Ping)?;
@ -678,16 +712,17 @@ impl State {
&packet, &packet,
SocketAddr::new(peer_cfg.address, daemon.config.gossip_port), SocketAddr::new(peer_cfg.address, daemon.config.gossip_port),
)?; )?;
}
} else { } else {
info!("Configure {} with no known endpoint", peer_cfg.pubkey); info!("Configure {} with no known endpoint", peer_cfg.pubkey);
Command::new("wg") Command::new("wg")
.args([ .args([
"set", "set",
&daemon.config.interface, &peer_cfg.interface,
"peer", "peer",
&peer_cfg.pubkey, &peer_cfg.pubkey,
"allowed-ips", "allowed-ips",
&format!("{}/32", peer_cfg.address), "::/0,0.0.0.0/0"
]) ])
.output()?; .output()?;
} }