From 941942aedae347a5aeb2a4c63c3443603dd26bbb Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 22 May 2020 22:01:27 +0200 Subject: [PATCH] WIP igd actor --- src/igd_actor.rs | 54 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/src/igd_actor.rs b/src/igd_actor.rs index 4e2f4b6..1263902 100644 --- a/src/igd_actor.rs +++ b/src/igd_actor.rs @@ -1,16 +1,28 @@ use igd::aio::*; +use igd::PortMappingProtocol; +use std::net::SocketAddrV4; use log::*; use anyhow::{Result, Context}; -use tokio::sync::watch; +use tokio::{ + select, + sync::watch, + time::{ + self, + Duration +}}; use crate::messages; pub struct IgdActor { + last_ports: messages::PublicExposedPorts, rx_ports: watch::Receiver, gateway: Gateway, + refresh: Duration, + expire: Duration, + private_ip: String } impl IgdActor { - pub async fn new(rxp: &watch::Receiver) -> Result { + pub async fn new(priv_ip: &str, refresh: Duration, expire: Duration, rxp: &watch::Receiver) -> Result { let gw = search_gateway(Default::default()) .await .context("Failed to find gateway")?; @@ -18,16 +30,48 @@ impl IgdActor { let ctx = Self { gateway: gw, - rx_ports: rxp.clone() + rx_ports: rxp.clone(), + private_ip: priv_ip.to_string(), + refresh: refresh, + expire: expire }; return Ok(ctx); } pub async fn listen(&mut self) -> Result<()> { - while let Some(ports) = self.rx_ports.recv().await { - println!("received = {:#?}", ports); + let mut interval = time::interval(self.refresh); + loop { + let res = select! { + msg = self.rx_ports.recv() => match msg { + Some(ports) => { self.last_ports = ports ; return self.do_igd().await; } , + None => return Ok(()) // Sender dropped, terminate loop. + } + _ = interval.tick() => self.do_igd().await + }; + + match res { + Ok(()) => debug!("Successfully updated IGD"), + Err(e) => error!("An error occured while updating IGD. {}. {:#?}", e, self.last_ports), + } } + } + + pub async fn do_igd(&self) -> Result<()> { + let actions = [ + (PortMappingProtocol::TCP, self.last_ports.tcp_ports), + (PortMappingProtocol::UDP, self.last_ports.udp_ports) + ]; + + for (proto, list) in &actions { + for port in list { + let service_str = format!("{}:{}", self.private_ip, port); + let service: SocketAddrV4 = service_str.parse()?.context("Invalid socket address"); + self.gateway.add_port(*proto, *port, service, self.expire.as_secs() as u32, "diplonat").await?; + debug!("IGD request successful for {:#?} {}", proto, service); + } + } + return Ok(()); } }