From d2ae084fc1be2671c2a301e689c8632576922785 Mon Sep 17 00:00:00 2001 From: darkgallium Date: Sun, 24 May 2020 20:40:49 +0200 Subject: [PATCH] add actor for firewall & massive refactor --- src/consul_actor.rs | 13 ++++---- src/diplonat.rs | 15 +++++++-- src/fw.rs | 37 ++++++++++++--------- src/fw_actor.rs | 80 +++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 4 +-- src/messages.rs | 12 ++++--- 6 files changed, 129 insertions(+), 32 deletions(-) create mode 100644 src/fw_actor.rs diff --git a/src/consul_actor.rs b/src/consul_actor.rs index 1cbb1b8..ba5d704 100644 --- a/src/consul_actor.rs +++ b/src/consul_actor.rs @@ -8,11 +8,12 @@ use serde::{Serialize, Deserialize}; use serde_lexpr::{from_str,error}; use crate::messages; use crate::consul; +use std::collections::HashSet; #[derive(Serialize, Deserialize, Debug)] pub enum DiplonatParameter { - tcp_port(Vec), - udp_port(Vec) + tcp_port(HashSet), + udp_port(HashSet) } #[derive(Serialize, Deserialize, Debug)] @@ -53,8 +54,8 @@ fn to_parameters(catalog: &consul::CatalogNode) -> Vec { fn to_open_ports(params: &Vec) -> messages::PublicExposedPorts { let mut op = messages::PublicExposedPorts { - tcp_ports: Vec::new(), - udp_ports: Vec::new() + tcp_ports: HashSet::new(), + udp_ports: HashSet::new() }; for conf in params { @@ -73,8 +74,8 @@ fn to_open_ports(params: &Vec) -> messages::PublicExposedPorts { impl ConsulActor { pub fn new(url: &str, node: &str) -> Self { let (tx, rx) = watch::channel(messages::PublicExposedPorts{ - tcp_ports: Vec::new(), - udp_ports: Vec::new() + tcp_ports: HashSet::new(), + udp_ports: HashSet::new() }); return Self { diff --git a/src/diplonat.rs b/src/diplonat.rs index 1be00f7..7b7bbb8 100644 --- a/src/diplonat.rs +++ b/src/diplonat.rs @@ -4,10 +4,12 @@ use tokio::try_join; use crate::consul_actor::ConsulActor; use crate::igd_actor::IgdActor; use crate::environment::Environment; +use crate::fw_actor::FirewallActor; pub struct Diplonat { consul: ConsulActor, - igd: IgdActor + igd: IgdActor, + firewall: FirewallActor } impl Diplonat { @@ -21,9 +23,15 @@ impl Diplonat { &ca.rx_open_ports ).await?; + let fw = FirewallActor::new( + env.refresh_time, + &ca.rx_open_ports + ).await?; + let ctx = Self { consul: ca, - igd: ia + igd: ia, + firewall: fw }; return Ok(ctx); @@ -32,7 +40,8 @@ impl Diplonat { pub async fn listen(&mut self) -> Result<()> { try_join!( self.consul.listen(), - self.igd.listen() + self.igd.listen(), + self.firewall.listen() )?; return Ok(()); diff --git a/src/fw.rs b/src/fw.rs index 7650b3a..955425a 100644 --- a/src/fw.rs +++ b/src/fw.rs @@ -2,13 +2,7 @@ use iptables; use regex::Regex; use std::collections::HashSet; use std::io; - - -#[derive(PartialEq,Eq,Debug,Hash)] -pub struct Port { - proto: String, - number: u16, -} +use crate::messages; #[derive(Debug)] pub struct FirewallError(String); @@ -17,26 +11,34 @@ impl From for FirewallError { fn from(error: iptables::error::IPTError) -> Self { FirewallError(error.to_string()) } - } pub fn setup(ipt: &iptables::IPTables) -> Result<(), FirewallError> { + ipt.new_chain("filter", "DIPLONAT")?; ipt.insert("filter", "INPUT", "-j DIPLONAT", 1)?; + Ok(()) } -pub fn open_ports(ipt: &iptables::IPTables, ports: Vec) -> Result<(), FirewallError> { +pub fn open_ports(ipt: &iptables::IPTables, ports: messages::PublicExposedPorts) -> Result<(), FirewallError> { - for p in ports { - ipt.append("filter", "DIPLONAT", &format!("-p {} --dport {} -j ACCEPT", p.proto, p.number))?; + for p in ports.tcp_ports { + ipt.append("filter", "DIPLONAT", &format!("-p tcp --dport {} -j ACCEPT", p))?; + } + + for p in ports.udp_ports { + ipt.append("filter", "DIPLONAT", &format!("-p udp --dport {} -j ACCEPT", p))?; } Ok(()) } -pub fn get_opened_ports(ipt: &iptables::IPTables) -> Result, FirewallError> { - let mut opened_ports: HashSet = HashSet::new(); +pub fn get_opened_ports(ipt: &iptables::IPTables) -> Result { + let mut ports = messages::PublicExposedPorts { + tcp_ports: HashSet::new(), + udp_ports: HashSet::new() + }; let list = ipt.list("filter", "DIPLONAT")?; let re = Regex::new(r"\-A.*? \-p (\w+).*\-\-dport (\d+).*?\-j ACCEPT").unwrap(); @@ -50,13 +52,18 @@ pub fn get_opened_ports(ipt: &iptables::IPTables) -> Result, Firew let proto = String::from(raw_proto.as_str()); let number = String::from(raw_port.as_str()).parse::().unwrap(); - opened_ports.insert( Port { proto, number } ); + if proto == "tcp" { + ports.tcp_ports.insert(number); + } else { + ports.udp_ports.insert(number); + } + }, _ => {} } } - Ok(opened_ports) + Ok(ports) } pub fn cleanup(ipt: &iptables::IPTables) -> Result<(), FirewallError> { diff --git a/src/fw_actor.rs b/src/fw_actor.rs new file mode 100644 index 0000000..9bc6610 --- /dev/null +++ b/src/fw_actor.rs @@ -0,0 +1,80 @@ +use igd::aio::*; +use igd::PortMappingProtocol; +use std::net::SocketAddrV4; +use log::*; +use anyhow::{Result, Context}; +use tokio::{ + select, + sync::watch, + time::{ + self, + Duration +}}; + +use iptables; +use crate::messages; +use crate::fw; +use std::collections::HashSet; + +pub struct FirewallActor { + ipt: iptables::IPTables, + rx_ports: watch::Receiver, + last_ports: messages::PublicExposedPorts, + refresh: Duration +} + +impl FirewallActor { + pub async fn new(_refresh: Duration, rxp: &watch::Receiver) -> Result { + + + let ctx = Self { + ipt: iptables::new(false).unwrap(), + rx_ports: rxp.clone(), + last_ports: messages::PublicExposedPorts::new(), + refresh: _refresh, + }; + + fw::setup(&ctx.ipt).expect("iptables setup failed"); + + return Ok(ctx); + } + + pub async fn listen(&mut self) -> Result<()> { + let mut interval = time::interval(self.refresh); + loop { + // 1. Wait for an event + let new_ports = select! { + Some(ports) = self.rx_ports.recv() => Some(ports), + _ = interval.tick() => None, + else => return Ok(()) // Sender dropped, terminate loop. + }; + + // 2. Update last ports if needed + if let Some(p) = new_ports { self.last_ports = p; } + + // 3. Update firewall rules + match self.do_fw_update().await { + Ok(()) => debug!("Successfully updated firewall rules"), + Err(e) => error!("An error occured while updating firewall rules. {}", e), + } + } + } + + pub async fn do_fw_update(&self) -> Result<()> { + + let curr_opened_ports = fw::get_opened_ports(&self.ipt).unwrap(); + + let diff_tcp = self.last_ports.tcp_ports.difference(&curr_opened_ports.tcp_ports).copied().collect::>(); + let diff_udp = self.last_ports.udp_ports.difference(&curr_opened_ports.udp_ports).copied().collect::>(); + + let ports_to_open = messages::PublicExposedPorts { + tcp_ports: diff_tcp, + udp_ports: diff_udp + }; + + fw::open_ports(&self.ipt, ports_to_open).unwrap(); + + return Ok(()); + } + +} diff --git a/src/main.rs b/src/main.rs index bf8248d..e845017 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod consul_actor; mod igd_actor; mod diplonat; mod fw; +mod fw_actor; use iptables; use log::*; @@ -15,9 +16,6 @@ async fn main() { pretty_env_logger::init(); info!("Starting Diplonat"); - let ipt = iptables::new(false).unwrap(); - fw::setup(&ipt).expect("iptables setup failed"); - let mut diplo = Diplonat::new().await.expect("Setup failed"); diplo.listen().await.expect("A runtime error occured"); } diff --git a/src/messages.rs b/src/messages.rs index 31ed48f..09a7c14 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -1,14 +1,16 @@ -#[derive(Debug, Clone)] +use std::collections::HashSet; + +#[derive(Debug, Clone, PartialEq, Eq)] pub struct PublicExposedPorts { - pub tcp_ports: Vec, - pub udp_ports: Vec + pub tcp_ports: HashSet, + pub udp_ports: HashSet } impl PublicExposedPorts { pub fn new() -> Self { return Self { - tcp_ports: Vec::new(), - udp_ports: Vec::new() + tcp_ports: HashSet::new(), + udp_ports: HashSet::new() } } }