Compare commits

...

10 commits

Author SHA1 Message Date
Lyn
53b7a6b6ed made IGD implementation compatible with IPv6 implementation, updated documentation for config structure changes 2025-01-14 15:07:07 +01:00
Lyn
69c175059c int overflow no. 2 fixed 2025-01-14 13:55:22 +01:00
Lyn
464188f945 integer overflow fix 2025-01-13 16:44:47 +01:00
Lyn
3943972be3 Merge remote-tracking branch 'yuka/main'
# Conflicts:
#	Cargo.lock
#	Cargo.toml
#	src/main.rs
2025-01-11 13:41:56 +01:00
Lyn
f2ca3b6e6c added IPv6 support; rewrote the igd implementation 2025-01-11 12:33:36 +01:00
Yureka
e039471e82 update cargo dependencies 2024-05-22 12:01:02 +02:00
Yureka
503f94ceeb fix 2023-11-18 19:14:06 +01:00
Yureka
d33e26d5ce fix 2023-11-18 18:55:10 +01:00
Yureka
1680284107 set ttl 2023-11-18 17:41:40 +01:00
Yureka
ff581dff6f support multiple interfaces 2023-11-18 16:26:09 +01:00
9 changed files with 2152 additions and 840 deletions

2
.gitignore vendored
View file

@ -1,3 +1,3 @@
/target /target
result result
result-bin result-bin

811
Cargo.lock generated

File diff suppressed because it is too large Load diff

1604
Cargo.nix

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[package] [package]
name = "wgautomesh" name = "wgautomesh"
version = "0.1.0" version = "0.1.1"
edition = "2021" edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -11,11 +11,12 @@ anyhow = "1.0"
log = "0.4" log = "0.4"
pretty_env_logger = "0.5" pretty_env_logger = "0.5"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0.215", features = ["derive"] }
bincode = "1.3" bincode = "1.3"
toml = { version = "0.8", default-features = false, features = ["parse"] } toml = { version = "0.8", default-features = false, features = ["parse"] }
xsalsa20poly1305 = "0.9" xsalsa20poly1305 = "0.9"
blake3 = "1.5" blake3 = "1.5"
pnet = "0.35.0"
igd = { version = "0.12", default-features = false } rupnp = "2.0.0"
get_if_addrs = "0.5" tokio = { version = "1.41.1", features = ["rt", "rt-multi-thread", "macros"] }
futures = "0.3.31"

View file

@ -32,9 +32,6 @@ and closely mirror the structure of the configuration file described below.
### Sample configuration file ### Sample configuration file
```toml ```toml
# The Wireguard interface to control.
interface = "wg0"
# The port wgautomesh will use to communicate from node to node. Wgautomesh # The port wgautomesh will use to communicate from node to node. Wgautomesh
# gossip communications occur inside the wireguard mesh network. # gossip communications occur inside the wireguard mesh network.
gossip_port = 1666 gossip_port = 1666
@ -42,10 +39,6 @@ gossip_port = 1666
# Enable discovery of other wgautomesh nodes on the same LAN using UDP broadcast. # Enable discovery of other wgautomesh nodes on the same LAN using UDP broadcast.
lan_discovery = true lan_discovery = true
# Enables UPnP/IGD forwarding of an external port to the Wireguard listening port
# on this node, for compatible routers/gateways.
upnp_forward_external_port = 33723
# The path to a file that contains the encryption secret wgautomesh uses to # The path to a file that contains the encryption secret wgautomesh uses to
# communicate. This secret can be any arbitrary utf-8 string. The following # communicate. This secret can be any arbitrary utf-8 string. The following
# command can be used to generate a new secret: # command can be used to generate a new secret:
@ -58,27 +51,51 @@ gossip_secret_file = "/var/lib/wgautomesh/gossip_secret"
# `[[peers]]` section when trying to establish connectivity. # `[[peers]]` section when trying to establish connectivity.
persist_file = "/var/lib/wgautomesh/state" persist_file = "/var/lib/wgautomesh/state"
[[peers]] # Configuration for a wireguard interface
pubkey = "7Nm7pMmyS7Nts1MB+loyD8u84ODxHPTkDu+uqQR6yDk=" [[interfaces]]
address = "10.14.1.2" # Interface name
endpoint = "77.207.15.215:33722" name = "wg0"
# Enables UPnP/IGD forwarding of an external port to the Wireguard listening port
# on this interface, for compatible routers/gateways.
# For IPv6 only interfaces, just assign any port here to enable IGD since the Wireguard
# interfaces listen port will always be pinholed instead.
upnp_forward_external_port = 51820
[[peers]] [[peers]]
# The Wireguard interface to use to connect with this peer.
interface = "wg0"
# Pubkey of the other peer
pubkey = "7Nm7pMmyS7Nts1MB+loyD8u84ODxHPTkDu+uqQR6yDk="
# Mesh-internal wireguard address
address = "10.14.1.2"
# (Optional) endpoint address of this peer
endpoint = "77.207.15.215"
# (Optional) endpoint port
port = 33722
[[peers]]
interface = "wg0"
pubkey = "lABn/axzD1jkFulX8c+K3B3CbKXORlIMDDoe8sQVxhs=" pubkey = "lABn/axzD1jkFulX8c+K3B3CbKXORlIMDDoe8sQVxhs="
address = "10.14.1.3" address = "10.14.1.3"
endpoint = "77.207.15.215:33723" endpoint = "77.207.15.215"
port = 33723
[[peers]] [[peers]]
interface = "wg0"
pubkey = "XLOYoMXF+PO4jcgfSVAk+thh4VmWx0wzWnb0xs08G1s=" pubkey = "XLOYoMXF+PO4jcgfSVAk+thh4VmWx0wzWnb0xs08G1s="
address = "10.14.4.1" address = "10.14.4.1"
endpoint = "bitfrost.fiber.shirokumo.net:33734" endpoint = "bitfrost.fiber.shirokumo.net"
port = 33734
[[peers]] [[peers]]
interface = "wg0"
pubkey = "smBQYUS60JDkNoqkTT7TgbpqFiM43005fcrT6472llI=" pubkey = "smBQYUS60JDkNoqkTT7TgbpqFiM43005fcrT6472llI="
address = "10.14.2.33" address = "10.14.2.33"
endpoint = "82.64.238.84:33733" endpoint = "82.64.238.84"
port = 33733
[[peers]] [[peers]]
interface = "wg0"
pubkey = "m9rLf+233X1VColmeVrM/xfDGro5W6Gk5N0zqcf32WY=" pubkey = "m9rLf+233X1VColmeVrM/xfDGro5W6Gk5N0zqcf32WY="
address = "10.14.3.1" address = "10.14.3.1"
``` ```

View file

@ -1,2 +1,3 @@
interface = "route48"
gossip_port = 1134 gossip_port = 1134
[[interfaces]]
name = "wg0"

2
rust-toolchain.toml Normal file
View file

@ -0,0 +1,2 @@
[toolchain]
channel = "nightly"

219
src/igd.rs Normal file
View file

@ -0,0 +1,219 @@
use std::collections::HashMap;
use log::*;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use futures::stream::{StreamExt};
use rupnp::{Device, Service};
use rupnp::ssdp::{SearchTarget, URN};
use std::str::FromStr;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result, Error};
const IGD_LEASE_DURATION: Duration = Duration::from_secs(300);
const WAN_IPV6_FIREWALL_CONTROL: URN = URN::service("schemas-upnp-org", "WANIPv6FirewallControl", 1);
const WAN_IP_CONNECTION: URN = URN::service("schemas-upnp-org", "WANIPConnection", 1);
pub async fn igd_loop_iter(portmap_v4:HashMap<u16,u16>,portmap_v6: Vec<u16>) -> Result<()> {
let lease_duration: u64 = IGD_LEASE_DURATION.as_secs();
//find gateway compatible with publishing the port for required IP version
let gateway = find_gateway(portmap_v6.len() >0).await?;
let gateway_ip: IpAddr = IpAddr::from_str(gateway.url().host().unwrap())?;
//find corresponding interface local IP to forward in gateway
let (local_ipv4, local_ipv6) = select_local_ip_for_gateway(gateway_ip)?;
debug!(
"Found gateway: {:?} at IP {}, making announce",
gateway.friendly_name(),
gateway.url().host().unwrap()
);
if local_ipv6.is_some() {
for listen_port in &portmap_v6 {
create_ipv6_firewall_pinhole(
&gateway,
&IpAddr::V6(local_ipv6.unwrap()),
listen_port,
&lease_duration,
)
.await?
}
}
if local_ipv4.is_some() {
for (listen_port, external_port) in &portmap_v4 {
create_ipv4_port_mapping(
&gateway,
&IpAddr::V4(local_ipv4.unwrap()),
listen_port,
external_port,
&lease_duration,
)
.await?
}
}
Ok(())
}
/// Create a port mapping to forward a given Port for a given internal IPv4
async fn create_ipv4_port_mapping(
gateway: &Device,
internal_ip: &IpAddr,
listen_port: &u16,
external_port: &u16,
lease_duration: &u64,
) -> Result<()> {
let wan_ip_con_service = gateway
.find_service(&WAN_IP_CONNECTION)
.expect("Gateway passed doesn't offer the required service to create IPv4 port mapping");
let arguments = format!(
"<NewRemoteHost/>
<NewExternalPort>{external_port}</NewExternalPort>
<NewProtocol>UDP</NewProtocol>
<NewInternalPort>{listen_port}</NewInternalPort>
<NewInternalClient>{internal_ip}</NewInternalClient>
<NewEnabled>1</NewEnabled>
<NewPortMappingDescription>Wireguard via wgautomesh</NewPortMappingDescription>
<NewLeaseDuration>{lease_duration}</NewLeaseDuration>"
);
debug!(
"Adding port mapping for internal IP {} on internal port {}, external port {}",
internal_ip, listen_port, external_port,
);
let result = wan_ip_con_service
.action(gateway.url(), "AddPortMapping", &arguments)
.await;
if result.is_ok() {
Ok(())
} else {
bail!(
"Error trying to add IPv4 port mapping: {}.\
Note: Have you checked whether your router allows this device to create (IPv4) port mappings?",
result.err().unwrap()
);
}
}
/// Create a pinhole for a given IPv6 address on a given port
async fn create_ipv6_firewall_pinhole(gateway: &Device,
ip: &IpAddr,
listen_port: &u16,
lease_duration: &u64,
) -> Result<()> {
let wan_ip6_fw_con = gateway
.find_service(&WAN_IPV6_FIREWALL_CONTROL)
.expect("Gateway passed doesn't offer the required service to create IPv6 pinholes");
let (firewall_enabled, can_create_inbound_pinhole) =
get_firewall_status(&gateway, &wan_ip6_fw_con).await;
if !firewall_enabled {
debug!("Gateway firewall is not enabled, incoming connections should be allowed as-is on all ports");
return Ok(());
} else if !can_create_inbound_pinhole {
bail!("Gateway said creating inbound IPv6 pinholes isn't allowed")
}
let arguments = format!(
"<RemoteHost/>
<RemotePort/>
<Protocol>17</Protocol>
<InternalPort>{listen_port}</InternalPort>
<InternalClient>{ip}</InternalClient>
<LeaseTime>{lease_duration}</LeaseTime>"
);
debug!(
"Opening firewall pinhole for IP {} on port {}",
ip, listen_port,
);
let result = wan_ip6_fw_con
.action(gateway.url(), "AddPinhole", &arguments)
.await;
if result.is_ok() {
Ok(())
} else {
bail!(
"Error trying to open IPv6 pinhole: {}\
Note: Have you checked whether your router allows this device to create (IPv6) pinholes?",
result.err().unwrap()
);
}
}
/// Asks the Gateway for the IPv6 Firewall status (whether the firewall is enabled AND whether devices in this network are allowed to create IPv6 pinholes per policy).
/// Note: This only works on IGDv2 supporting firewalls (-> any firewall that can do IPv6)
async fn get_firewall_status(gateway: &Device, igd_service: &Service) -> (bool, bool) {
let firewall_status_response = igd_service
.action(gateway.url(), "GetFirewallStatus", "")
.await
.unwrap();
let firewall_enabled: bool =
(u32::from_str(firewall_status_response.get("FirewallEnabled").unwrap()).unwrap()) != 0;
let can_create_inbound_pinhole: bool = u32::from_str(
firewall_status_response
.get("InboundPinholeAllowed")
.unwrap(),
)
.unwrap()
!= 0;
(firewall_enabled, can_create_inbound_pinhole)
}
/// Find a Gateway compatible with either IPv4 (supports WANIPConnection) or IPv6 (supports WANIPv6FirewallControl)
async fn find_gateway(ipv6_required: bool) -> Result<Device, Error> {
let search_urn: URN = if ipv6_required {
WAN_IPV6_FIREWALL_CONTROL
} else {
WAN_IP_CONNECTION
};
let discovered_devices = rupnp::discover(
&SearchTarget::URN(search_urn.clone()),
Duration::from_secs(3),
)
.await?
.filter_map(|result| async {
match result {
Ok(device) => Some(device),
Err(_) => None,
}
});
futures::pin_mut!(discovered_devices);
discovered_devices.next().await.ok_or_else(||anyhow!("Couldn't find any gateways supporting {}. Is port 1900 open for incoming connections from local networks?", search_urn.typ()))
}
/// Returns a list of IPs assigned to interfaces that are in the same subnet as a given IP
fn interface_ips_in_same_subnet_as(ip_to_match: IpAddr) -> Result<Vec<IpAddr>, Error> {
let interfaces = pnet::datalink::interfaces();
let ipnets = interfaces
.iter()
.filter_map(|interface| {
if interface
.ips
.iter()
.any(|ipnetwork| ipnetwork.contains(ip_to_match))
{
Some(interface.ips.clone())
} else {
None
}
})
.next()
.context("Couldn't find any local IPs within the same network as given IP")?;
let ips = ipnets.iter().map(|ip| ip.ip()).collect();
Ok(ips)
}
/// Selects the local IP we tell the Gateway to port forward to (/pinhole) later on
/// Note: As soon as this[https://github.com/jakobhellermann/ssdp-client/issues/11] is fixed and the dependency is upgraded in rupnp, we can simplify it
fn select_local_ip_for_gateway(gateway: IpAddr) -> Result<(Option<Ipv4Addr>,Option<Ipv6Addr>), Error> {
//get IPs in same subnet as Gateway
let ips = interface_ips_in_same_subnet_as(gateway)?;
//removes IPv6 that are not globally routable as pinholing those would be pointless
let v6_cleaned_up = ips
.iter()
.filter(|ip| ip.is_ipv4() || (ip.is_global() && ip.is_ipv6()));
//get the first IPv4 and IPv6 found
let mut first_ipv4 :Option<Ipv4Addr> = None;
let mut first_ipv6 :Option<Ipv6Addr> = None;
for ip in v6_cleaned_up {
match ip {
IpAddr::V4(ipv4) => { if first_ipv4.is_none() { first_ipv4 = Some(ipv4.clone()); } }
IpAddr::V6(ipv6) => { if first_ipv6.is_none() { first_ipv6 = Some(ipv6.clone()); } }
}
if first_ipv4.is_some() && first_ipv6.is_some() {
break;
}
}
if first_ipv4.is_none() && first_ipv6.is_none() {
bail!("Couldn't find any IP address")
}
Ok((first_ipv4, first_ipv6))
}

View file

@ -1,5 +1,8 @@
use std::collections::HashMap; #![feature(ip)]
use std::net::{IpAddr, SocketAddr, SocketAddrV4, ToSocketAddrs, UdpSocket}; mod igd;
use igd::*;
use std::collections::{HashMap, HashSet};
use std::net::{IpAddr, SocketAddr, ToSocketAddrs, UdpSocket};
use std::process::Command; use std::process::Command;
use std::sync::Mutex; use std::sync::Mutex;
use std::thread; use std::thread;
@ -26,14 +29,10 @@ const PERSIST_INTERVAL: Duration = Duration::from_secs(600);
const LAN_BROADCAST_INTERVAL: Duration = Duration::from_secs(60); const LAN_BROADCAST_INTERVAL: Duration = Duration::from_secs(60);
const IGD_INTERVAL: Duration = Duration::from_secs(60); const IGD_INTERVAL: Duration = Duration::from_secs(60);
const IGD_LEASE_DURATION: Duration = Duration::from_secs(300);
type Pubkey = String; type Pubkey = String;
#[derive(Deserialize)] #[derive(Deserialize)]
struct Config { struct Config {
/// The Wireguard interface name
interface: Pubkey,
/// The port to use for gossip inside the Wireguard mesh (must be the same on all nodes) /// The port to use for gossip inside the Wireguard mesh (must be the same on all nodes)
gossip_port: u16, gossip_port: u16,
/// The secret to use to authenticate nodes between them /// The secret to use to authenticate nodes between them
@ -46,23 +45,36 @@ struct Config {
#[serde(default)] #[serde(default)]
lan_discovery: bool, lan_discovery: bool,
/// Forward an external port to Wiregard using UPnP IGD
upnp_forward_external_port: Option<u16>,
/// The list of peers we try to connect to /// The list of peers we try to connect to
#[serde(default)] #[serde(default)]
peers: Vec<Peer>, peers: Vec<Peer>,
/// Settings for the Wireguard interfaces (currently only necessary if you want to use igd features)
#[serde(default)]
interfaces: Vec<InterfaceSetting>,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
struct Peer { struct Peer {
/// The peer's Wireguard public key /// The peer's Wireguard public key
pubkey: Pubkey, pubkey: Pubkey,
/// The peer's Wireguard address /// The destination used for gossip packets
address: IpAddr, address: IpAddr,
/// An optionnal Wireguard endpoint used to initialize a connection to this peer /// The Wireguard interface name used to communicate with this peer
interface: String,
/// The endpoint port
port: Option<u16>,
/// An optional Wireguard endpoint used to initialize a connection to this peer
endpoint: Option<String>, endpoint: Option<String>,
} }
/// Settings for Wireguard interfaces
#[derive(Deserialize)]
struct InterfaceSetting {
/// The Wireguard interface name
name: String,
/// Forward an external port to Wiregard using UPnP IGD.
upnp_forward_external_port: Option<u16>,
}
fn main() -> Result<()> { fn main() -> Result<()> {
pretty_env_logger::init(); pretty_env_logger::init();
@ -115,9 +127,15 @@ fn kdf(secret: &str) -> xsalsa20poly1305::Key {
hash.as_bytes().clone().into() hash.as_bytes().clone().into()
} }
fn wg_dump(config: &Config) -> Result<(Pubkey, u16, Vec<(Pubkey, Option<SocketAddr>, u64)>)> { struct IfInfo {
our_pubkey: Pubkey,
listen_port: u16,
peers: Vec<(Pubkey, Option<SocketAddr>, u64)>,
}
fn wg_dump(interface: &str) -> Result<IfInfo> {
let output = Command::new("wg") let output = Command::new("wg")
.args(["show", &config.interface, "dump"]) .args(["show", interface, "dump"])
.output()?; .output()?;
let mut lines = std::str::from_utf8(&output.stdout)?.split('\n'); let mut lines = std::str::from_utf8(&output.stdout)?.split('\n');
@ -125,7 +143,7 @@ fn wg_dump(config: &Config) -> Result<(Pubkey, u16, Vec<(Pubkey, Option<SocketAd
if ourself.len() < 3 { if ourself.len() < 3 {
bail!( bail!(
"Unable to fetch wireguard status for interface {}", "Unable to fetch wireguard status for interface {}",
config.interface interface
); );
} }
let our_pubkey = ourself[1].to_string(); let our_pubkey = ourself[1].to_string();
@ -146,7 +164,7 @@ fn wg_dump(config: &Config) -> Result<(Pubkey, u16, Vec<(Pubkey, Option<SocketAd
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok((our_pubkey, listen_port, peers)) Ok(IfInfo { our_pubkey, listen_port, peers })
} }
// ============ DAEMON CODE ================= // ============ DAEMON CODE =================
@ -154,10 +172,9 @@ fn wg_dump(config: &Config) -> Result<(Pubkey, u16, Vec<(Pubkey, Option<SocketAd
struct Daemon { struct Daemon {
config: Config, config: Config,
gossip_key: xsalsa20poly1305::Key, gossip_key: xsalsa20poly1305::Key,
our_pubkey: Pubkey,
listen_port: u16,
socket: UdpSocket, socket: UdpSocket,
state: Mutex<State>, state: Mutex<State>,
our_pubkey: Pubkey,
} }
struct PeerInfo { struct PeerInfo {
@ -165,10 +182,10 @@ struct PeerInfo {
gossip_ip: IpAddr, gossip_ip: IpAddr,
gossip_prio: u64, gossip_prio: u64,
// Info retrieved from wireguard // Info retrieved from wireguard
endpoint: Option<SocketAddr>, endpoint: Option<IpAddr>,
last_seen: u64, last_seen: u64,
// Info received by LAN broadcast // Info received by LAN broadcast
lan_endpoint: Option<(SocketAddr, u64)>, lan_endpoint: Option<(IpAddr, u64)>,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -177,12 +194,11 @@ enum Gossip {
Pong, Pong,
Announce { Announce {
pubkey: Pubkey, pubkey: Pubkey,
endpoints: Vec<(SocketAddr, u64)>, endpoints: Vec<(IpAddr, u64)>,
}, },
Request, Request,
LanBroadcast { LanBroadcast {
pubkey: Pubkey, pubkey: Pubkey,
listen_port: u16,
}, },
} }
@ -190,18 +206,35 @@ impl Daemon {
fn new(config: Config) -> Result<Self> { fn new(config: Config) -> Result<Self> {
let gossip_key = kdf(config.gossip_secret.as_deref().unwrap_or_default()); let gossip_key = kdf(config.gossip_secret.as_deref().unwrap_or_default());
let (our_pubkey, listen_port, _peers) = wg_dump(&config)?; let interface_names = config.peers.iter().map(|peer| peer.interface.clone()).collect::<HashSet<_>>();
let socket = UdpSocket::bind(SocketAddr::new("0.0.0.0".parse()?, config.gossip_port))?; let interfaces = interface_names.into_iter().map(|interface_name| wg_dump(&interface_name).map(|ifinfo| (interface_name, ifinfo))).collect::<Result<HashMap<_, _>>>()?;
let socket = UdpSocket::bind(SocketAddr::new("::".parse()?, config.gossip_port))?;
socket.set_broadcast(true)?; socket.set_broadcast(true)?;
socket.set_ttl(1)?;
let our_pubkey = interfaces.iter().next().unwrap().1.our_pubkey.clone();
let peers = config.peers.iter().map(|peer_cfg| {
(
peer_cfg.pubkey.clone(),
PeerInfo {
gossip_ip: peer_cfg.address,
gossip_prio: fasthash(format!("{}-{}", our_pubkey, peer_cfg.pubkey).as_bytes()),
endpoint: None, // Is resolved as DNS name later
last_seen: u64::MAX,
lan_endpoint: None,
}
)
}).collect();
Ok(Daemon { Ok(Daemon {
config, config,
gossip_key, gossip_key,
our_pubkey,
listen_port,
socket, socket,
our_pubkey,
state: Mutex::new(State { state: Mutex::new(State {
peers: HashMap::new(), interfaces,
peers,
gossip: HashMap::new(), gossip: HashMap::new(),
}), }),
}) })
@ -236,7 +269,7 @@ impl Daemon {
fn initialize(&self) -> Result<()> { fn initialize(&self) -> Result<()> {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.read_wg_peers(self)?; state.read_wg_peers()?;
state.setup_wg_peers(self, 0)?; state.setup_wg_peers(self, 0)?;
Ok(()) Ok(())
} }
@ -259,7 +292,7 @@ impl Daemon {
error!("Wireguard configuration loop error: {}", e); error!("Wireguard configuration loop error: {}", e);
} }
i = i + 1; i = i + 1;
std::thread::sleep(TRY_INTERVAL); thread::sleep(TRY_INTERVAL);
} }
} }
@ -267,7 +300,7 @@ impl Daemon {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
// 1. Update local peers info of peers // 1. Update local peers info of peers
state.read_wg_peers(self)?; state.read_wg_peers()?;
// 2. Send gossip for peers where there is a big update // 2. Send gossip for peers where there is a big update
let announces = state let announces = state
@ -299,7 +332,7 @@ impl Daemon {
loop { loop {
if let Err(e) = self.recv_loop_iter() { if let Err(e) = self.recv_loop_iter() {
error!("Receive loop error: {}", e); error!("Receive loop error: {}", e);
std::thread::sleep(Duration::from_secs(10)); thread::sleep(Duration::from_secs(10));
} }
} }
} }
@ -322,12 +355,10 @@ impl Daemon {
} }
Gossip::LanBroadcast { Gossip::LanBroadcast {
pubkey, pubkey,
listen_port,
} => { } => {
if self.config.lan_discovery { if self.config.lan_discovery {
if let Some(peer) = state.peers.get_mut(&pubkey) { if let Some(peer) = state.peers.get_mut(&pubkey) {
let addr = SocketAddr::new(from.ip(), listen_port); peer.lan_endpoint = Some((from.ip(), time()));
peer.lan_endpoint = Some((addr, time()));
} }
} }
} }
@ -369,7 +400,7 @@ impl Daemon {
if let Err(e) = self.persist_state(file) { if let Err(e) = self.persist_state(file) {
error!("Could not write persistent state to disk: {}", e); error!("Could not write persistent state to disk: {}", e);
} }
std::thread::sleep(PERSIST_INTERVAL); thread::sleep(PERSIST_INTERVAL);
} }
} }
} }
@ -386,7 +417,7 @@ impl Daemon {
if let Err(e) = self.lan_broadcast_iter() { if let Err(e) = self.lan_broadcast_iter() {
error!("LAN broadcast loop error: {}", e); error!("LAN broadcast loop error: {}", e);
} }
std::thread::sleep(LAN_BROADCAST_INTERVAL); thread::sleep(LAN_BROADCAST_INTERVAL);
} }
} }
} }
@ -394,59 +425,76 @@ impl Daemon {
fn lan_broadcast_iter(&self) -> Result<()> { fn lan_broadcast_iter(&self) -> Result<()> {
let packet = self.make_packet(&Gossip::LanBroadcast { let packet = self.make_packet(&Gossip::LanBroadcast {
pubkey: self.our_pubkey.clone(), pubkey: self.our_pubkey.clone(),
listen_port: self.listen_port,
})?; })?;
let addr = SocketAddr::new("255.255.255.255".parse().unwrap(), self.config.gossip_port); let broadcast_addr = SocketAddr::new("255.255.255.255".parse()?, self.config.gossip_port);
self.socket.send_to(&packet, addr)?; let broadcast_addrv6 = SocketAddr::new("ff05::1".parse()?, self.config.gossip_port);
self.socket.send_to(&packet, broadcast_addr)?;
self.socket.send_to(&packet, broadcast_addrv6)?;
Ok(()) Ok(())
} }
fn igd_loop(&self) { fn igd_loop(&self) {
if let Some(external_port) = self.config.upnp_forward_external_port { if self.config.interfaces.iter().any(|interface| interface.upnp_forward_external_port.is_some()) {
loop { let rt = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
if let Err(e) = self.igd_loop_iter(external_port) { rt.block_on(async {
error!("IGD loop error: {}", e); loop {
let (portmap_v4, portmap_v6) = self.generate_portmaps();
if !portmap_v4.is_empty() || !portmap_v6.is_empty() {
if let Err(e) = igd_loop_iter(portmap_v4, portmap_v6).await {
error!("IGD loop error: {}", e);
}
}
tokio::time::sleep(IGD_INTERVAL).await;
} }
std::thread::sleep(IGD_INTERVAL); });
}
} }
} }
fn generate_portmaps(&self) -> (HashMap<u16,u16>, Vec<u16>) {
let mut portmap_v4: HashMap<u16, u16> = HashMap::new();
let mut v6_internal_ports: Vec<u16> = Vec::new();
fn igd_loop_iter(&self, external_port: u16) -> Result<()> { //collect interfaces that have peers with IPv4 addresses (-> IPv4 port mappings need to exist)
let gateway = igd::search_gateway(Default::default())?; let binding = self.state.lock().unwrap();
let interfaces = binding.interfaces.iter().clone();
let gwa = gateway.addr.ip().octets(); let v4ifs: Vec<_> = interfaces.clone()
let cmplen = match gwa { .filter(|(ifname, ifinfo)| {
[192, 168, _, _] => 3, ifinfo.listen_port!=0
[10, _, _, _] => 2, && self.config
_ => bail!( .peers
"Gateway IP does not appear to be in a local network ({})", .iter()
gateway.addr.ip() .any(|peer| peer.interface == **ifname && peer.address.is_ipv4())
),
};
let private_ip = get_if_addrs::get_if_addrs()?
.into_iter()
.map(|i| i.addr.ip())
.filter_map(|a| match a {
std::net::IpAddr::V4(a4) if a4.octets()[..cmplen] == gwa[..cmplen] => Some(a4),
_ => None,
}) })
.next() .collect();
.ok_or(anyhow!("No interface has an IP on same subnet as gateway"))?;
debug!(
"IGD: gateway is {}, private IP is {}, making announce",
gateway.addr, private_ip
);
gateway.add_port( //collect interfaces that have peers with IPv6 addresses (-> pinholes for the IPv6 listen ports should be created)
igd::PortMappingProtocol::UDP, let v6ifs: Vec<_> = interfaces
external_port, .filter(|(ifname, ifinfo)| {
SocketAddrV4::new(private_ip, self.listen_port), ifinfo.listen_port!=0
IGD_LEASE_DURATION.as_secs() as u32, && self.config
"Wireguard via wgautomesh", .peers
)?; .iter()
.any(|peer| peer.interface == **ifname && peer.address.is_ipv6())
})
.collect();
for ifsetting in &self.config.interfaces {
if let Some(external_port) = ifsetting.upnp_forward_external_port {
if let Some((_ifname, ifinfo)) = v4ifs
.iter()
.find(|(ifname, _)| **ifname == ifsetting.name)
{
portmap_v4.insert(ifinfo.listen_port, external_port);
}
if let Some((_ifname, ifinfo)) = v6ifs
.iter()
.find(|(ifname, _)| **ifname == ifsetting.name)
{
v6_internal_ports.push(ifinfo.listen_port);
}
}
}
(portmap_v4, v6_internal_ports)
Ok(())
} }
fn make_packet(&self, gossip: &Gossip) -> Result<Vec<u8>> { fn make_packet(&self, gossip: &Gossip) -> Result<Vec<u8>> {
@ -469,7 +517,8 @@ impl Daemon {
struct State { struct State {
peers: HashMap<Pubkey, PeerInfo>, peers: HashMap<Pubkey, PeerInfo>,
gossip: HashMap<Pubkey, Vec<(SocketAddr, u64)>>, gossip: HashMap<Pubkey, Vec<(IpAddr, u64)>>,
interfaces: HashMap<String, IfInfo>,
} }
impl State { impl State {
@ -481,7 +530,7 @@ impl State {
let mut peer_vec = self let mut peer_vec = self
.peers .peers
.iter() .iter()
.filter(|(_, info)| now < info.last_seen + TIMEOUT.as_secs() && info.endpoint.is_some()) .filter(|(_, info)| info.last_seen != u64::MAX && now < info.last_seen + TIMEOUT.as_secs() && info.endpoint.is_some())
.map(|(_, info)| (info.gossip_ip, info.gossip_prio)) .map(|(_, info)| (info.gossip_ip, info.gossip_prio))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
peer_vec.sort_by_key(|(_, prio)| *prio); peer_vec.sort_by_key(|(_, prio)| *prio);
@ -499,7 +548,7 @@ impl State {
&mut self, &mut self,
daemon: &Daemon, daemon: &Daemon,
pubkey: Pubkey, pubkey: Pubkey,
mut endpoints: Vec<(SocketAddr, u64)>, mut endpoints: Vec<(IpAddr, u64)>,
) -> Result<()> { ) -> Result<()> {
let propagate = { let propagate = {
match self.gossip.get_mut(&pubkey) { match self.gossip.get_mut(&pubkey) {
@ -541,36 +590,21 @@ impl State {
Ok(()) Ok(())
} }
fn read_wg_peers(&mut self, daemon: &Daemon) -> Result<()> { fn read_wg_peers(&mut self) -> Result<()> {
let (_, _, wg_peers) = wg_dump(&daemon.config)?;
// Clear old known endpoints if any // Clear old known endpoints if any
for (_, peer) in self.peers.iter_mut() { for (_, peer) in self.peers.iter_mut() {
peer.endpoint = None; peer.endpoint = None;
} }
for (pk, endpoint, last_seen) in wg_peers { for (ifname, ifinfo) in &mut self.interfaces {
match self.peers.get_mut(&pk) { *ifinfo = wg_dump(&ifname)?;
Some(i) => { for (pk, endpoint, last_seen) in &mut ifinfo.peers {
i.endpoint = endpoint; if let Some(i) = self.peers.get_mut(&*pk) {
i.last_seen = last_seen; i.endpoint = endpoint.as_ref().map(SocketAddr::ip);
} i.last_seen = *last_seen;
None => { } else {
let gossip_ip = match daemon.config.peers.iter().find(|x| x.pubkey == pk) { warn!("unknown peer: {}", pk);
Some(x) => x.address,
None => continue,
};
let gossip_prio = fasthash(format!("{}-{}", daemon.our_pubkey, pk).as_bytes());
self.peers.insert(
pk,
PeerInfo {
endpoint,
lan_endpoint: None,
gossip_prio,
gossip_ip,
last_seen,
},
);
} }
} }
} }
@ -601,11 +635,11 @@ impl State {
// if peer is connected and endpoint is the correct one, // if peer is connected and endpoint is the correct one,
// set higher keepalive and then skip reconfiguring it // set higher keepalive and then skip reconfiguring it
if !bad_endpoint && now < peer.last_seen + TIMEOUT.as_secs() { if !bad_endpoint && peer.last_seen != u64::MAX && now < peer.last_seen + TIMEOUT.as_secs() {
Command::new("wg") Command::new("wg")
.args([ .args([
"set", "set",
&daemon.config.interface, &peer_cfg.interface,
"peer", "peer",
&peer_cfg.pubkey, &peer_cfg.pubkey,
"persistent-keepalive", "persistent-keepalive",
@ -631,11 +665,11 @@ impl State {
.cloned() .cloned()
.unwrap_or_default(); .unwrap_or_default();
if let Some(endpoint) = &peer_cfg.endpoint { if let Some(endpoint) = &peer_cfg.endpoint {
match endpoint.to_socket_addrs() { match format!("{}:0", endpoint).to_socket_addrs() {
Err(e) => error!("Could not resolve DNS for {}: {}", endpoint, e), Err(e) => error!("Could not resolve DNS for {}: {}", endpoint, e),
Ok(iter) => { Ok(iter) => {
for addr in iter { for addr in iter {
endpoints.push((addr, 0)); endpoints.push((addr.ip(), 0));
} }
} }
} }
@ -657,37 +691,38 @@ impl State {
// Skip if we are already using that endpoint // Skip if we are already using that endpoint
continue; continue;
} }
if let Some(port) = peer_cfg.port {
info!("Configure {} with endpoint {}", peer_cfg.pubkey, endpoint); info!("Configure {} with endpoint {}", peer_cfg.pubkey, endpoint);
Command::new("wg") Command::new("wg")
.args([ .args([
"set", "set",
&daemon.config.interface, &peer_cfg.interface,
"peer", "peer",
&peer_cfg.pubkey, &peer_cfg.pubkey,
"endpoint", "endpoint",
&endpoint.to_string(), &SocketAddr::new(endpoint, port).to_string(),
"persistent-keepalive", "persistent-keepalive",
"10", "10",
"allowed-ips", "allowed-ips",
&format!("{}/32", peer_cfg.address), "::/0,0.0.0.0/0"
]) ])
.output()?; .output()?;
let packet = daemon.make_packet(&Gossip::Ping)?; let packet = daemon.make_packet(&Gossip::Ping)?;
daemon.socket.send_to( daemon.socket.send_to(
&packet, &packet,
SocketAddr::new(peer_cfg.address, daemon.config.gossip_port), SocketAddr::new(peer_cfg.address, daemon.config.gossip_port),
)?; )?;
}
} else { } else {
info!("Configure {} with no known endpoint", peer_cfg.pubkey); info!("Configure {} with no known endpoint", peer_cfg.pubkey);
Command::new("wg") Command::new("wg")
.args([ .args([
"set", "set",
&daemon.config.interface, &peer_cfg.interface,
"peer", "peer",
&peer_cfg.pubkey, &peer_cfg.pubkey,
"allowed-ips", "allowed-ips",
&format!("{}/32", peer_cfg.address), "::/0,0.0.0.0/0"
]) ])
.output()?; .output()?;
} }