use std::collections::HashSet; use std::net::{Ipv4Addr, Ipv6Addr}; use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, bail, Result}; use tokio::select; use tokio::sync::watch; use tracing::*; use crate::dns_config::*; use crate::DomainProvider; const RETRY_DELAY: Duration = Duration::from_secs(600); // 10 minutes pub async fn dns_updater_task( mut rx_dns_config: watch::Receiver>, providers: Vec, allowed_domains: Vec, mut must_exit: watch::Receiver, ) { for dom in allowed_domains.iter() { info!(domain = dom, "allowing subdomains of domain"); } for prov in providers.iter() { info!( domain = prov.domain, provider = prov.provider.provider(), "got provider for domain" ); } info!("DNS updater starting"); let mut config = Arc::new(DnsConfig::new()); let mut failures = HashSet::new(); while !*must_exit.borrow() { select!( c = rx_dns_config.changed() => { if c.is_err() { break; } } _ = tokio::time::sleep(RETRY_DELAY) => { if failures.is_empty() { continue; } } _ = must_exit.changed() => continue, ); // Always lag 15 seconds behind actual updates, // to avoid sending too many at once and hitting rate limits tokio::time::sleep(Duration::from_secs(15)).await; let new_config: Arc = rx_dns_config.borrow_and_update().clone(); let mut new_failures = HashSet::new(); for (key, value) in new_config.entries.iter() { if failures.contains(key) { info!( record = key.to_string(), target = value.to_string(), "retrying after failure" ); } else if config.entries.get(key) == Some(value) { // Skip entries that haven't changed, and that were // successfully updated on the previous iteration continue; } // Skip entries for unallowed domains if !allowed_domains .iter() .any(|d| key.dns_path == *d || key.dns_path.ends_with(&format!(".{}", d))) { error!( domain = key.dns_path, "domain/subdomain/hostname not in allowed list", ); continue; } let provider = providers.iter().find(|p| { key.dns_path == p.domain || key.dns_path.ends_with(&format!(".{}", p.domain)) }); if let Some(provider) = provider { if let Err(e) = update_dns_entry(key, value, provider).await { error!( record = key.to_string(), target = value.to_string(), error = e.to_string(), "unable to update record, will retry later" ); new_failures.insert(key.clone()); } } else { error!( domain = key.dns_path, "no provider matches this domain/subdomain/hostname" ); } } config = new_config; failures = new_failures; } } async fn update_dns_entry( key: &DnsEntryKey, value: &DnsEntryValue, provider: &DomainProvider, ) -> Result<()> { let subdomain = if key.dns_path == provider.domain { None } else { Some( key.dns_path .strip_suffix(&format!(".{}", provider.domain)) .unwrap(), ) }; info!( record = key.to_string(), target = value.to_string(), domain = provider.domain, subdomain = &subdomain, provider = provider.provider.provider(), "updating record" ); if value.targets.is_empty() { bail!("zero targets (internal error)"); } match key.record_type { DnsRecordType::A => { let mut targets = vec![]; for tgt in value.targets.iter() { targets.push( tgt.parse::() .map_err(|_| anyhow!("Invalid ipv4 address: {}", tgt))?, ); } provider .provider .update_a(&provider.domain, subdomain, &targets) .await?; } DnsRecordType::AAAA => { let mut targets = vec![]; for tgt in value.targets.iter() { targets.push( tgt.parse::() .map_err(|_| anyhow!("Invalid ipv6 address: {}", tgt))?, ); } provider .provider .update_aaaa(&provider.domain, subdomain, &targets) .await?; } DnsRecordType::CNAME => { let mut targets = value.targets.iter().cloned().collect::>(); if targets.len() > 1 { targets.sort(); warn!( record = key.to_string(), all_targets = value.to_string(), selected_target = targets[0], "Several CNAME targets, taking first one in alphabetical order. Consider switching to a single global target instead." ); } provider .provider .update_cname(&provider.domain, subdomain, &targets[0]) .await?; } } Ok(()) }