diff --git a/src/diplonat.rs b/src/diplonat.rs index 565c567..1be00f7 100644 --- a/src/diplonat.rs +++ b/src/diplonat.rs @@ -14,7 +14,12 @@ impl Diplonat { pub async fn new() -> Result { let env = Environment::new()?; let ca = ConsulActor::new(&env.consul_url, &env.consul_node_name); - let ia = IgdActor::new(&ca.rx_open_ports).await?; + let ia = IgdActor::new( + &env.private_ip, + env.refresh_time, + env.expiration_time, + &ca.rx_open_ports + ).await?; let ctx = Self { consul: ca, diff --git a/src/igd_actor.rs b/src/igd_actor.rs index 1263902..68d20df 100644 --- a/src/igd_actor.rs +++ b/src/igd_actor.rs @@ -33,7 +33,8 @@ impl IgdActor { rx_ports: rxp.clone(), private_ip: priv_ip.to_string(), refresh: refresh, - expire: expire + expire: expire, + last_ports: messages::PublicExposedPorts::new() }; return Ok(ctx); @@ -42,31 +43,34 @@ impl IgdActor { pub async fn listen(&mut self) -> Result<()> { let mut interval = time::interval(self.refresh); loop { - let res = select! { - msg = self.rx_ports.recv() => match msg { - Some(ports) => { self.last_ports = ports ; return self.do_igd().await; } , - None => return Ok(()) // Sender dropped, terminate loop. - } - _ = interval.tick() => self.do_igd().await + // 1. Wait for an event + let new_ports = select! { + Some(ports) = self.rx_ports.recv() => Some(ports), + _ = interval.tick() => None, + else => return Ok(()) // Sender dropped, terminate loop. }; - match res { + // 2. Update last ports if needed + if let Some(p) = new_ports { self.last_ports = p; } + + // 3. Flush IGD requests + match self.do_igd().await { Ok(()) => debug!("Successfully updated IGD"), - Err(e) => error!("An error occured while updating IGD. {}. {:#?}", e, self.last_ports), + Err(e) => error!("An error occured while updating IGD. {}", e), } } } pub async fn do_igd(&self) -> Result<()> { let actions = [ - (PortMappingProtocol::TCP, self.last_ports.tcp_ports), - (PortMappingProtocol::UDP, self.last_ports.udp_ports) + (PortMappingProtocol::TCP, &self.last_ports.tcp_ports), + (PortMappingProtocol::UDP, &self.last_ports.udp_ports) ]; - for (proto, list) in &actions { - for port in list { + for (proto, list) in actions.iter() { + for port in *list { let service_str = format!("{}:{}", self.private_ip, port); - let service: SocketAddrV4 = service_str.parse()?.context("Invalid socket address"); + let service = service_str.parse::().context("Invalid socket address")?; self.gateway.add_port(*proto, *port, service, self.expire.as_secs() as u32, "diplonat").await?; debug!("IGD request successful for {:#?} {}", proto, service); } diff --git a/src/messages.rs b/src/messages.rs index 46cc4c5..31ed48f 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -3,3 +3,12 @@ pub struct PublicExposedPorts { pub tcp_ports: Vec, pub udp_ports: Vec } + +impl PublicExposedPorts { + pub fn new() -> Self { + return Self { + tcp_ports: Vec::new(), + udp_ports: Vec::new() + } + } +}