2022-12-07 14:35:12 +00:00
use std ::collections ::{ HashMap , HashSet } ;
use std ::fmt ;
2023-04-05 13:38:16 +00:00
use std ::net ::{ Ipv4Addr , Ipv6Addr } ;
2022-12-07 14:35:12 +00:00
use std ::sync ::Arc ;
2023-04-05 13:38:16 +00:00
use std ::time ::{ Duration , SystemTime } ;
2022-12-07 14:35:12 +00:00
2023-04-05 13:38:16 +00:00
use anyhow ::Result ;
use serde ::{ Deserialize , Serialize } ;
2023-02-02 15:30:00 +00:00
use tokio ::{ select , sync ::watch } ;
2022-12-11 14:46:48 +00:00
use tracing ::* ;
2022-12-07 14:35:12 +00:00
use df_consul ::* ;
2023-04-05 13:38:16 +00:00
const IP_TARGET_METADATA_TAG_PREFIX : & str = " public_ " ;
2022-12-07 14:35:12 +00:00
const CNAME_TARGET_METADATA_TAG : & str = " cname_target " ;
2023-04-05 13:38:16 +00:00
const AUTODISCOVERY_CACHE_DURATION : u64 = 600 ; // 10 minutes
2022-12-07 14:35:12 +00:00
// ---- Extract DNS config from Consul catalog ----
2023-01-11 21:44:21 +00:00
#[ derive(Debug, Eq, PartialEq, Default) ]
2022-12-07 14:35:12 +00:00
pub struct DnsConfig {
pub entries : HashMap < DnsEntryKey , DnsEntryValue > ,
}
2023-01-11 21:35:56 +00:00
#[ derive(Clone, Debug, Hash, PartialEq, Eq) ]
2022-12-07 14:35:12 +00:00
pub struct DnsEntryKey {
2022-12-11 15:29:06 +00:00
pub dns_path : String ,
2022-12-07 14:35:12 +00:00
pub record_type : DnsRecordType ,
}
#[ derive(Debug, PartialEq, Eq) ]
pub struct DnsEntryValue {
pub targets : HashSet < String > ,
}
2023-01-11 21:35:56 +00:00
#[ derive(Clone, Copy, Debug, Hash, PartialEq, Eq) ]
2022-12-07 14:35:12 +00:00
#[ allow(clippy::upper_case_acronyms) ]
pub enum DnsRecordType {
A ,
AAAA ,
CNAME ,
}
impl DnsConfig {
pub fn new ( ) -> Self {
Self {
entries : HashMap ::new ( ) ,
}
}
fn add ( & mut self , k : DnsEntryKey , v : DnsEntryValue ) {
if let Some ( ent ) = self . entries . get_mut ( & k ) {
ent . targets . extend ( v . targets ) ;
} else {
self . entries . insert ( k , v ) ;
}
}
}
2023-04-05 13:38:16 +00:00
// ---- fetcher and autodiscovery cache ----
2022-12-07 14:35:12 +00:00
2023-04-05 13:38:16 +00:00
pub fn spawn_dns_config_task (
consul : Consul ,
must_exit : watch ::Receiver < bool > ,
) -> watch ::Receiver < Arc < DnsConfig > > {
let ( tx , rx ) = watch ::channel ( Arc ::new ( DnsConfig ::new ( ) ) ) ;
let fetcher = DnsConfigFetcher {
consul ,
node_ipv4_cache : HashMap ::new ( ) ,
node_ipv6_cache : HashMap ::new ( ) ,
2022-12-07 14:35:12 +00:00
} ;
2023-04-05 13:38:16 +00:00
tokio ::spawn ( fetcher . task ( tx , must_exit ) ) ;
rx
2022-12-07 14:35:12 +00:00
}
2023-04-05 13:38:16 +00:00
struct DnsConfigFetcher {
consul : Consul ,
node_ipv4_cache : HashMap < String , ( u64 , Option < Ipv4Addr > ) > ,
node_ipv6_cache : HashMap < String , ( u64 , Option < Ipv6Addr > ) > ,
}
2022-12-07 14:35:12 +00:00
2023-04-05 13:38:16 +00:00
impl DnsConfigFetcher {
async fn task (
mut self ,
tx : watch ::Sender < Arc < DnsConfig > > ,
mut must_exit : watch ::Receiver < bool > ,
) {
let mut catalog_rx = self
. consul
. watch_all_service_health ( Duration ::from_secs ( 60 ) ) ;
2022-12-07 14:35:12 +00:00
while ! * must_exit . borrow ( ) {
2023-02-02 15:30:00 +00:00
select! {
_ = catalog_rx . changed ( ) = > ( ) ,
2022-12-07 14:35:12 +00:00
_ = must_exit . changed ( ) = > continue ,
} ;
2023-04-05 13:38:16 +00:00
let services = catalog_rx . borrow_and_update ( ) . clone ( ) ;
match self . parse_catalog ( & services ) . await {
Ok ( dns_config ) = > tx . send ( Arc ::new ( dns_config ) ) . expect ( " Internal error " ) ,
Err ( e ) = > {
error! ( " Error when parsing tags: {} " , e ) ;
}
} ;
}
}
2022-12-07 14:35:12 +00:00
2023-04-05 13:38:16 +00:00
async fn parse_catalog ( & mut self , services : & catalog ::AllServiceHealth ) -> Result < DnsConfig > {
let mut dns_config = DnsConfig ::new ( ) ;
for ( _svc , nodes ) in services . iter ( ) {
for node in nodes . iter ( ) {
// Do not take into account backends if any have status critical
if node . checks . iter ( ) . any ( | x | x . status = = " critical " ) {
continue ;
}
for tag in node . service . tags . iter ( ) {
if let Some ( ( k , v ) ) = self . parse_d53_tag ( tag , & node . node ) . await ? {
dns_config . add ( k , v ) ;
2023-02-02 15:30:00 +00:00
}
2023-04-05 13:38:16 +00:00
}
}
}
Ok ( dns_config )
}
async fn parse_d53_tag (
& mut self ,
tag : & str ,
node : & catalog ::Node ,
) -> Result < Option < ( DnsEntryKey , DnsEntryValue ) > > {
let splits = tag . split ( ' ' ) . collect ::< Vec < _ > > ( ) ;
if splits . len ( ) ! = 2 {
return Ok ( None ) ;
}
2023-04-05 14:02:32 +00:00
let ( record_type , target ) = match splits [ 0 ] {
2023-04-05 13:38:16 +00:00
" d53-a " = > match self . get_node_ipv4 ( & node ) . await ? {
2023-04-05 14:02:32 +00:00
Some ( tgt ) = > ( DnsRecordType ::A , tgt . to_string ( ) ) ,
2023-04-05 13:38:16 +00:00
None = > {
warn! ( " Got d53-a tag `{}` but node {} does not appear to have a known public IPv4 address. Tag is ignored. " , tag , node . node ) ;
return Ok ( None ) ;
}
} ,
" d53-aaaa " = > match self . get_node_ipv6 ( & node ) . await ? {
2023-04-05 14:02:32 +00:00
Some ( tgt ) = > ( DnsRecordType ::AAAA , tgt . to_string ( ) ) ,
2023-04-05 13:38:16 +00:00
None = > {
warn! ( " Got d53-aaaa tag `{}` but node {} does not appear to have a known public IPv6 address. Tag is ignored. " , tag , node . node ) ;
return Ok ( None ) ;
}
} ,
" d53-cname " = > match node . meta . get ( CNAME_TARGET_METADATA_TAG ) {
2023-04-05 14:02:32 +00:00
Some ( tgt ) = > ( DnsRecordType ::CNAME , tgt . to_string ( ) ) ,
2023-04-05 13:38:16 +00:00
None = > {
warn! ( " Got d53-cname tag `{}` but node {} does not have a {} metadata value. Tag is ignored. " , tag , node . node , CNAME_TARGET_METADATA_TAG ) ;
return Ok ( None ) ;
}
} ,
_ = > return Ok ( None ) ,
} ;
Ok ( Some ( (
DnsEntryKey {
dns_path : splits [ 1 ] . to_string ( ) ,
record_type ,
} ,
2023-04-05 14:02:32 +00:00
DnsEntryValue {
targets : [ target ] . into_iter ( ) . collect ( ) ,
} ,
2023-04-05 13:38:16 +00:00
) ) )
}
async fn get_node_ipv4 ( & mut self , node : & catalog ::Node ) -> Result < Option < Ipv4Addr > > {
Self ::get_node_ip ( & self . consul , " ipv4 " , & mut self . node_ipv4_cache , node ) . await
}
async fn get_node_ipv6 ( & mut self , node : & catalog ::Node ) -> Result < Option < Ipv6Addr > > {
Self ::get_node_ip ( & self . consul , " ipv6 " , & mut self . node_ipv6_cache , node ) . await
}
async fn get_node_ip < A > (
consul : & Consul ,
family : & str ,
cache : & mut HashMap < String , ( u64 , Option < A > ) > ,
node : & catalog ::Node ,
) -> Result < Option < A > >
where
A : Serialize + for < ' de > Deserialize < ' de > + std ::fmt ::Debug + std ::str ::FromStr + Copy + Eq ,
< A as std ::str ::FromStr > ::Err : Send + Sync + std ::error ::Error + 'static ,
{
match cache . get ( & node . node ) {
Some ( ( t , a ) ) if timestamp ( ) < = t + AUTODISCOVERY_CACHE_DURATION = > Ok ( * a ) ,
_ = > {
let kv_key = format! ( " diplonat/autodiscovery/ {} / {} " , family , node . node ) ;
let autodiscovery = consul . kv_get ( & kv_key ) . await ? ;
if let Some ( json ) = autodiscovery {
let a = serde_json ::from_slice ::< DiplonatAutodiscoveryResult < A > > ( & json ) ? ;
if timestamp ( ) < = a . timestamp + AUTODISCOVERY_CACHE_DURATION {
if cache . get ( & node . node ) . map ( | x | x . 1 ) ! = Some ( a . address ) {
info! (
" Got {} address for {} from diplonat autodiscovery: {:?} " ,
family , node . node , a . address
) ;
2022-12-07 14:35:12 +00:00
}
2023-04-05 13:38:16 +00:00
cache . insert ( node . node . clone ( ) , ( a . timestamp , a . address ) ) ;
return Ok ( a . address ) ;
} else {
warn! ( " {} address for {} from diplonat autodiscovery is outdated (value: {:?}), falling back on value from Consul node meta " , family , node . node , a . address ) ;
2022-12-07 14:35:12 +00:00
}
}
2023-04-05 13:38:16 +00:00
let meta_tag = format! ( " {} {} " , IP_TARGET_METADATA_TAG_PREFIX , family ) ;
let a = node . meta . get ( & meta_tag ) . map ( | x | x . parse ( ) ) . transpose ( ) ? ;
if cache . get ( & node . node ) . map ( | x | x . 1 ) ! = Some ( a ) {
info! (
" Got {} address for {} from Consul node meta: {:?} " ,
family , node . node , a
) ;
}
cache . insert ( node . node . clone ( ) , ( timestamp ( ) , a ) ) ;
Ok ( a )
}
2022-12-07 14:35:12 +00:00
}
2023-04-05 13:38:16 +00:00
}
}
2022-12-07 14:35:12 +00:00
2023-04-05 13:38:16 +00:00
// ---- util for interaction with diplonat ----
#[ derive(Serialize, Deserialize, Debug) ]
pub struct DiplonatAutodiscoveryResult < A > {
pub timestamp : u64 ,
pub address : Option < A > ,
}
fn timestamp ( ) -> u64 {
SystemTime ::now ( )
. duration_since ( SystemTime ::UNIX_EPOCH )
. expect ( " clock error " )
. as_secs ( )
2022-12-07 14:35:12 +00:00
}
// ---- Display impls ----
impl std ::fmt ::Display for DnsRecordType {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
match self {
DnsRecordType ::A = > write! ( f , " A " ) ,
DnsRecordType ::AAAA = > write! ( f , " AAAA " ) ,
DnsRecordType ::CNAME = > write! ( f , " CNAME " ) ,
}
}
}
impl std ::fmt ::Display for DnsEntryKey {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
2022-12-11 15:29:06 +00:00
write! ( f , " {} IN {} " , self . dns_path , self . record_type )
2022-12-07 14:35:12 +00:00
}
}
impl std ::fmt ::Display for DnsEntryValue {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
write! ( f , " [ " ) ? ;
for ( i , tgt ) in self . targets . iter ( ) . enumerate ( ) {
if i > 0 {
write! ( f , " " ) ? ;
}
write! ( f , " {} " , tgt ) ? ;
}
write! ( f , " ] " )
}
}