2019-08-09 15:01:28 +00:00
# include <sys/timerfd.h>
# include "algo_utils.h"
# include "utils.h"
# include "url.h"
# include "proxy.h"
# include "timer.h"
2019-09-16 12:40:58 +00:00
# include "proxy.h"
2019-08-13 15:07:52 +00:00
2019-08-27 08:10:17 +00:00
uint64_t compute_delta ( struct timespec * prev_time , uint64_t max ) {
struct timespec curr ;
int secs , nsecs ;
uint64_t mili_sec ;
2019-08-27 12:31:27 +00:00
// 1. We compute the time difference
2019-08-27 08:10:17 +00:00
if ( clock_gettime ( CLOCK_MONOTONIC , & curr ) = = - 1 ) {
perror ( " clock_gettime error " ) ;
exit ( EXIT_FAILURE ) ;
}
secs = curr . tv_sec - prev_time - > tv_sec ;
nsecs = curr . tv_nsec - prev_time - > tv_nsec ;
* prev_time = curr ;
mili_sec = secs * 1000 + nsecs / 1000000 ;
if ( mili_sec > max ) mili_sec = max ;
return mili_sec ;
}
2019-08-28 08:50:34 +00:00
int is_blacklisted ( struct thunder_ctx * thunderc , int link_id ) {
return thunderc - > blacklisted [ link_id ] > = thunderc - > received_pkts_on_link [ link_id ] ;
}
2019-08-09 15:01:28 +00:00
void prepare ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
2019-08-13 15:07:52 +00:00
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct thunder_ctx * thunderc = app_ctx - > misc ;
2019-08-27 12:31:27 +00:00
thunderc - > emit_id + + ;
2019-08-13 15:07:52 +00:00
union abstract_packet metadata = {
. fmt . headers . cmd = CMD_UDP_METADATA_THUNDER ,
. fmt . headers . size = sizeof ( metadata . fmt . headers ) + sizeof ( metadata . fmt . content . udp_metadata_thunder ) ,
. fmt . headers . flags = 0 ,
. fmt . content . udp_metadata_thunder . id = thunderc - > emit_id ,
} ;
2019-08-26 10:08:31 +00:00
buffer_append_ap ( bp , & metadata ) ;
2019-08-28 14:33:43 +00:00
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [algo_thunder] UDP metadata added \n " ) ;
2019-08-09 15:01:28 +00:00
}
void pad ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
2019-08-27 08:10:17 +00:00
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct thunder_ctx * thunderc = app_ctx - > misc ;
uint64_t ref = 0l + thunderc - > emit_id ;
2019-08-09 15:01:28 +00:00
2019-08-27 15:28:14 +00:00
dup_buffer_toa ( & app_ctx - > br , bp , ( void * ) ref ) ;
// 1. Clean old buffers (we keep only thunderc->total_links buffer, keeping more would be useless)
if ( ref > thunderc - > total_links & & get_app_buffer ( & app_ctx - > br , ( void * ) ( ref - thunderc - > total_links ) ) ) {
mv_buffer_atof ( & app_ctx - > br , ( void * ) ( ref - thunderc - > total_links ) ) ;
2019-08-27 08:10:17 +00:00
}
2019-08-27 12:31:27 +00:00
// 2. Append abstract packets stored in our buffers
uint64_t add_ref = ref ;
while ( 1 ) {
if ( add_ref < 1 ) break ;
add_ref - - ;
struct buffer_packet * bp_iter = get_app_buffer ( & app_ctx - > br , ( void * ) add_ref ) ;
if ( bp_iter = = NULL ) break ;
union abstract_packet * ap = buffer_first_ap ( bp_iter ) ;
2019-08-28 14:33:43 +00:00
if ( ap - > fmt . headers . cmd ! = CMD_UDP_ENCAPSULATED ) {
fprintf ( stderr , " Invalid buffer payload! \n " ) ;
exit ( EXIT_FAILURE ) ;
}
union abstract_packet * ap_meta = ap_next ( ap ) ;
if ( ap_meta - > fmt . headers . cmd ! = CMD_UDP_METADATA_THUNDER ) {
fprintf ( stderr , " Invalid buffer metadata! \n " ) ;
2019-08-27 12:31:27 +00:00
exit ( EXIT_FAILURE ) ;
}
2019-08-28 14:33:43 +00:00
if ( buffer_full_size ( bp ) + ap - > fmt . headers . size + ap_meta - > fmt . headers . size > TOR_CELL_SIZE - thunderc - > monit_pkt_size ) break ;
2019-08-27 12:31:27 +00:00
buffer_append_ap ( bp , ap ) ;
2019-08-28 14:33:43 +00:00
buffer_append_ap ( bp , ap_meta ) ;
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [algo_thunder] Pad packet (now %ld bytes) \n " , buffer_full_size ( bp ) ) ;
2019-08-27 08:10:17 +00:00
}
2019-08-09 15:01:28 +00:00
}
int schedule ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
2019-08-29 15:53:49 +00:00
char url [ 256 ] ;
2019-08-26 10:08:31 +00:00
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct thunder_ctx * thunderc = app_ctx - > misc ;
struct evt_core_fdinfo * to_fdinfo = NULL ;
struct evt_core_cat * cat = evt_core_get_from_cat ( ctx , " tcp-write " ) ;
2019-09-14 13:44:30 +00:00
uint8_t protect = thunderc - > total_links ;
2019-08-26 15:35:23 +00:00
do {
// 1. We choose the link
2019-08-28 14:33:43 +00:00
if ( cat - > socklist - > len = = 0 ) {
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [algo_thunder] No link available, packet will be dropped \n " ) ;
break ;
}
2019-08-29 15:53:49 +00:00
to_fdinfo = NULL ;
do {
thunderc - > selected_link = ( thunderc - > selected_link + 1 ) % thunderc - > total_links ;
sprintf ( url , " tcp:write:127.0.0.1:%d " , 7500 + thunderc - > selected_link ) ;
to_fdinfo = evt_core_get_from_url ( ctx , url ) ;
} while ( to_fdinfo = = NULL ) ;
2019-08-29 16:44:27 +00:00
//printf("URL %s has been retained\n", url);
2019-08-26 15:35:23 +00:00
// 2. We create the packet template
2019-08-26 10:08:31 +00:00
union abstract_packet links = {
. fmt . headers . cmd = CMD_LINK_MONITORING_THUNDER ,
2019-08-27 08:10:17 +00:00
. fmt . headers . size = thunderc - > monit_pkt_size ,
2019-08-26 10:08:31 +00:00
. fmt . headers . flags = 0 ,
2019-08-26 15:35:23 +00:00
. fmt . content . link_monitoring_thunder . links_status = { }
2019-08-26 10:08:31 +00:00
} ;
2019-08-26 15:35:23 +00:00
// 3. We append the template to the buffer
2019-08-26 10:08:31 +00:00
struct buffer_packet * bp_dup = dup_buffer_tow ( & app_ctx - > br , bp , to_fdinfo ) ;
2019-08-26 15:35:23 +00:00
union abstract_packet * new_ap = buffer_append_ap ( bp_dup , & links ) ;
// 4. We compute the time difference
2019-09-16 12:40:58 +00:00
uint64_t mili_sec = 0 ;
if ( protect = = thunderc - > total_links )
mili_sec = compute_delta ( & thunderc - > prev_link_time , UINT16_MAX ) ;
2019-08-26 15:35:23 +00:00
2019-09-16 13:06:57 +00:00
//printf("send packet on link %d with delta=%ld\n", thunderc->selected_link, mili_sec);
2019-08-26 15:35:23 +00:00
// 5. We create the array
struct link_info * li = & new_ap - > fmt . content . link_monitoring_thunder . links_status ;
for ( int i = 0 ; i < thunderc - > total_links ; i + + ) {
2019-09-16 12:40:58 +00:00
if ( thunderc - > sent_pkts_on_link [ i ] = = 0 ) continue ;
2019-08-26 15:35:23 +00:00
thunderc - > delta_t_per_link [ i ] + = mili_sec ;
2019-08-29 16:11:50 +00:00
li [ i ] . delta_t = thunderc - > delta_t_per_link [ i ] > UINT16_MAX ? UINT16_MAX : thunderc - > delta_t_per_link [ i ] ;
2019-08-26 15:35:23 +00:00
}
2019-08-29 16:25:37 +00:00
thunderc - > delta_t_per_link [ thunderc - > selected_link ] = 0 ;
2019-08-26 15:35:23 +00:00
li [ thunderc - > selected_link ] . delta_t = 0 ;
2019-09-16 12:40:58 +00:00
thunderc - > sent_pkts_on_link [ thunderc - > selected_link ] + + ;
2019-08-26 10:08:31 +00:00
2019-08-28 12:57:20 +00:00
if ( ctx - > verbose > 1 ) {
dump_buffer_packet ( bp_dup ) ;
fprintf ( stderr , " [algo_thunder] Will send this info \n " ) ;
}
2019-08-26 10:08:31 +00:00
main_on_tcp_write ( ctx , to_fdinfo ) ;
2019-08-29 15:53:49 +00:00
2019-09-14 13:44:30 +00:00
} while ( is_blacklisted ( thunderc , thunderc - > selected_link ) & & protect - - > 0 ) ;
if ( protect = = 0 ) {
fprintf ( stderr , " all links were blacklisted, resetting \n " ) ;
for ( int i = 0 ; i < thunderc - > total_links ; i + + ) {
2019-09-15 13:31:10 +00:00
fprintf ( stderr , " link=%d, blacklisted=%ld, rcved=%ld \n " , i , thunderc - > blacklisted [ i ] , thunderc - > received_pkts_on_link [ i ] ) ;
2019-09-14 13:44:30 +00:00
thunderc - > received_pkts_on_link [ i ] = thunderc - > blacklisted [ i ] + 1 ;
}
}
2019-08-26 10:08:31 +00:00
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [algo_thunder] Packets sent \n " ) ;
// Release the buffer
mv_buffer_rtof ( & app_ctx - > br , fdinfo ) ;
2019-08-09 15:01:28 +00:00
return 0 ;
}
2019-09-15 13:31:10 +00:00
struct block_info {
uint8_t i ;
struct algo_ctx * app_ctx ;
uint64_t missing ;
uint8_t is_timeout ;
char reason [ 1024 ] ;
} ;
2019-08-28 08:50:34 +00:00
void on_block ( struct evt_core_ctx * ctx , void * raw ) {
struct block_info * bi = raw ;
struct thunder_ctx * thunderc = bi - > app_ctx - > misc ;
2019-09-16 13:06:57 +00:00
if ( bi - > is_timeout & & thunderc - > received_pkts_on_link [ bi - > i ] > = bi - > missing ) goto release ;
2019-08-28 08:50:34 +00:00
if ( thunderc - > blacklisted [ bi - > i ] > = bi - > missing ) goto release ;
2019-09-15 16:10:45 +00:00
printf ( " %s \n " , bi - > reason ) ;
2019-08-28 08:50:34 +00:00
thunderc - > blacklisted [ bi - > i ] = bi - > missing ;
release :
2019-09-15 13:31:10 +00:00
if ( bi - > is_timeout ) free ( bi ) ;
2019-08-28 08:50:34 +00:00
}
2019-08-30 08:39:00 +00:00
int is_in_order ( struct thunder_ctx * thunderc , uint8_t link_id ) {
uint64_t ref = thunderc - > received_pkts_on_link [ link_id ] ;
for ( int i = 0 ; i < thunderc - > total_links ; i + + ) {
2019-08-30 08:48:23 +00:00
uint64_t expected = link_id > = i ? ref : ref - 1 ;
if ( thunderc - > received_pkts_on_link [ i ] > expected ) {
2019-08-30 09:12:20 +00:00
//printf("link_id=%d, i=%d, pkt_i=%ld, pkt_i_expected=%ld, pkt_link_id=%ld\n", link_id, i, thunderc->received_pkts_on_link[i], expected, ref);
2019-08-30 08:48:23 +00:00
return 0 ;
}
2019-08-30 08:39:00 +00:00
}
return 1 ;
}
2019-08-27 15:28:14 +00:00
void classify ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct thunder_ctx * thunderc = app_ctx - > misc ;
union abstract_packet * ap = buffer_first_ap ( bp ) ;
while ( ap ! = NULL & & ap - > fmt . headers . cmd ! = CMD_LINK_MONITORING_THUNDER ) ap = ap_next ( ap ) ;
if ( ap = = NULL ) {
fprintf ( stderr , " Unable to find our packet \n " ) ;
exit ( EXIT_FAILURE ) ;
}
2019-08-30 08:39:00 +00:00
/*
2019-08-29 15:53:49 +00:00
if ( ap - > fmt . headers . flags & FLAG_RESET ) {
for ( int i = 0 ; i < MAX_LINKS ; i + + ) thunderc - > received_pkts_on_link [ i ] = 1 ;
}
2019-08-30 08:39:00 +00:00
*/
2019-08-29 15:53:49 +00:00
2019-08-30 08:39:00 +00:00
// 1. Update link info
2019-08-27 15:28:14 +00:00
int link_id = url_get_port_int ( fdinfo - > url ) - 7500 ;
thunderc - > received_pkts_on_link [ link_id ] + + ;
2019-08-30 09:12:20 +00:00
//printf("Received %ld packets on link %d\n", thunderc->received_pkts_on_link[link_id], link_id);
2019-08-27 15:28:14 +00:00
struct link_info * li = & ap - > fmt . content . link_monitoring_thunder . links_status ;
2019-08-30 08:39:00 +00:00
uint64_t mili_sec = compute_delta ( & thunderc - > prev_rcv_link_time , UINT16_MAX ) ;
for ( int i = 0 ; i < thunderc - > total_links ; i + + ) {
2019-09-16 13:06:57 +00:00
if ( thunderc - > received_pkts_on_link [ i ] < = 1 ) continue ;
2019-08-30 08:39:00 +00:00
thunderc - > rcv_delta_t_per_link [ i ] + = mili_sec ;
}
thunderc - > rcv_delta_t_per_link [ link_id ] = 0 ;
2019-09-16 12:40:58 +00:00
// 2.
int64_t current_owdd = 0 ;
int64_t biggest_owdd = current_owdd ;
for ( int i = 0 ; i < thunderc - > total_links ; i + + ) {
int64_t remote_delta = 0 , local_delta = 0 , owd_difference = 0 ;
uint64_t expected = i < = link_id ? thunderc - > received_pkts_on_link [ link_id ] : thunderc - > received_pkts_on_link [ link_id ] - 1 ;
if ( thunderc - > received_pkts_on_link [ i ] ! = expected ) continue ;
2019-09-16 13:06:57 +00:00
if ( thunderc - > received_pkts_on_link [ i ] < = 1 ) continue ;
2019-09-16 12:40:58 +00:00
remote_delta = li [ i ] . delta_t ;
local_delta = thunderc - > rcv_delta_t_per_link [ i ] ;
owd_difference = local_delta - remote_delta ;
//printf("li[i].delta_t=%d, remote_delta=%ld, thunderc->rcv_delta_t_per_link[i]=%ld, local_delta=%ld, owd_difference=%ld\n", li[i].delta_t, remote_delta, thunderc->rcv_delta_t_per_link[i], local_delta, owd_difference);
//if (remote_delta > 10000 || local_delta > 10000) continue; // Too many time elapsed for useful comparison
if ( owd_difference > biggest_owdd ) {
biggest_owdd = owd_difference ;
2019-08-30 08:39:00 +00:00
}
2019-09-16 12:40:58 +00:00
//printf("----> %ld vs %ld for i=%d, link_id=%d, owd_difference=%ld, smallest=%ld, remote=%ld, local=%ld\n", thunderc->received_pkts_on_link[i], expected, i, link_id, owd_difference, smallest_owdd, local_delta, remote_delta);
2019-08-30 08:39:00 +00:00
2019-09-16 13:06:57 +00:00
//printf(" owd_difference = %ld, max=%ld, min=%ld\n", owd_difference, thunderc->allowed_jitter_ms, -thunderc->allowed_jitter_ms);
2019-09-16 12:40:58 +00:00
if ( owd_difference < = thunderc - > allowed_jitter_ms & & owd_difference > = - thunderc - > allowed_jitter_ms ) continue ;
2019-08-30 08:39:00 +00:00
2019-09-16 13:06:57 +00:00
struct block_info bi = { 0 } ;
bi . app_ctx = app_ctx ;
bi . is_timeout = 0 ;
bi . app_ctx = app_ctx ;
sprintf ( bi . reason , " Erreur " ) ;
2019-09-16 12:40:58 +00:00
if ( owd_difference < - thunderc - > allowed_jitter_ms ) {
2019-09-16 13:06:57 +00:00
bi . i = i ;
bi . missing = expected ;
sprintf ( bi . reason , " Packet Too Late - Blocked non current link %d owd_diff=%ld, local_delta=%ld, remote_delta=%ld " , i , owd_difference , local_delta , remote_delta ) ;
2019-09-16 12:40:58 +00:00
} else if ( owd_difference > thunderc - > allowed_jitter_ms ) {
2019-09-16 13:06:57 +00:00
bi . i = link_id ;
bi . missing = thunderc - > received_pkts_on_link [ link_id ] ;
sprintf ( bi . reason , " Packet Too Late - Blocked current link %d owd_diff=%ld, local_delta=%ld, remote_delta=%ld " , link_id , owd_difference , local_delta , remote_delta ) ;
2019-09-16 12:40:58 +00:00
} else {
fprintf ( stderr , " Algorithm is wrong \n " ) ;
exit ( EXIT_FAILURE ) ;
2019-08-30 08:39:00 +00:00
}
2019-09-16 12:40:58 +00:00
2019-09-16 13:06:57 +00:00
on_block ( ctx , & bi ) ;
2019-09-16 12:40:58 +00:00
}
// 2.5 Compute link jitter
2019-09-16 13:06:57 +00:00
//printf("retained owd: %ld\n", biggest_owdd);
2019-09-16 12:40:58 +00:00
int64_t current_link_jitter = biggest_owdd ;
for ( int i = 0 ; i < thunderc - > total_links ; i + + ) {
2019-09-16 13:06:57 +00:00
thunderc - > estimated_sent [ i ] = li [ i ] . delta_t /*+ current_link_jitter*/ ;
2019-08-30 08:39:00 +00:00
}
// 3. Disable links that miss packets
2019-08-28 08:50:34 +00:00
for ( uint8_t i = 0 ; i < thunderc - > total_links ; i + + ) {
2019-08-27 15:28:14 +00:00
uint64_t expected = i < = link_id ? thunderc - > received_pkts_on_link [ link_id ] : thunderc - > received_pkts_on_link [ link_id ] - 1 ;
if ( thunderc - > received_pkts_on_link [ i ] > = expected ) continue ; // Nothing to do, all packets have been received
2019-08-28 08:50:34 +00:00
2019-09-15 06:39:47 +00:00
int64_t timeout = thunderc - > allowed_jitter_ms - li [ i ] . delta_t ;
2019-08-28 08:50:34 +00:00
struct block_info * bi = malloc ( sizeof ( struct block_info ) ) ;
2019-09-15 13:31:10 +00:00
bi - > i = i ; bi - > app_ctx = app_ctx ; bi - > missing = expected ; bi - > is_timeout = 1 ;
2019-08-28 08:50:34 +00:00
2019-09-15 16:10:45 +00:00
sprintf ( bi - > reason , " Missing Packet - Timeout for link %d after %ldms (expected: %ld, seen: %ld) " , i , timeout , expected , thunderc - > received_pkts_on_link [ i ] ) ;
2019-08-30 08:39:00 +00:00
if ( timeout < = 0 ) {
on_block ( ctx , bi ) ;
2019-08-30 09:12:20 +00:00
//printf(" Missing Packet - Blocked link %d (expected: %ld, seen: %ld)\n", i, expected, thunderc->received_pkts_on_link[i]);
2019-08-30 08:39:00 +00:00
continue ;
}
2019-08-28 16:05:56 +00:00
set_timeout ( ctx , timeout , bi , on_block ) ;
2019-09-15 13:31:10 +00:00
2019-08-28 14:33:43 +00:00
if ( ctx - > verbose > 1 ) {
fprintf ( stderr , " [algo_thunder] Set timeout on link %d of %ld ms (packets expected: %ld, seen: %ld) \n " ,
i , timeout , expected , thunderc - > received_pkts_on_link [ i ] ) ;
}
2019-08-27 15:28:14 +00:00
}
2019-08-28 14:33:43 +00:00
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [algo_thunder] Classify done \n " ) ;
2019-08-29 15:53:49 +00:00
printf ( " Blacklisted links: " ) ;
for ( int i = 0 ; i < thunderc - > total_links ; i + + ) {
if ( is_blacklisted ( thunderc , i ) ) printf ( " _ " ) ;
else printf ( " U " ) ;
}
printf ( " \n " ) ;
2019-08-26 15:48:22 +00:00
}
2019-08-28 09:35:43 +00:00
struct unpad_info {
2019-09-16 12:40:58 +00:00
union abstract_packet * ap_arr_pl [ MAX_LINKS ] ,
* ap_arr_meta [ MAX_LINKS ] ;
2019-08-28 09:35:43 +00:00
uint8_t ap_arr_vals ;
} ;
void unpad ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp , struct unpad_info * ui ) {
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct thunder_ctx * thunderc = app_ctx - > misc ;
for ( union abstract_packet * ap = buffer_first_ap ( bp ) ; ap ! = NULL ; ap = ap_next ( ap ) ) {
if ( ap - > fmt . headers . cmd ! = CMD_UDP_ENCAPSULATED ) continue ;
2019-08-09 15:01:28 +00:00
2019-08-28 09:35:43 +00:00
union abstract_packet * ap_meta = ap_next ( ap ) ;
if ( ap_meta = = NULL | | ap_meta - > fmt . headers . cmd ! = CMD_UDP_METADATA_THUNDER ) {
fprintf ( stderr , " Unexpected packet, expecting udp metadata \n " ) ;
}
2019-09-16 12:40:58 +00:00
ui - > ap_arr_pl [ ui - > ap_arr_vals ] = ap ;
ui - > ap_arr_meta [ ui - > ap_arr_vals ] = ap_meta ;
ui - > ap_arr_vals + + ;
2019-08-28 09:35:43 +00:00
}
2019-08-28 14:33:43 +00:00
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [algo_thunder] Unpad done \n " ) ;
2019-08-26 15:48:22 +00:00
}
2019-09-16 12:40:58 +00:00
int compare_int64 ( const void * a , const void * b ) {
int64_t * x = ( int64_t * ) a ;
int64_t * y = ( int64_t * ) b ;
return * x - * y ;
}
void get_estimation ( struct thunder_ctx * thunderc , int64_t * sorted_estimation , uint64_t * sorted_occurencies ) {
uint64_t deleted = 0 ;
memcpy ( sorted_estimation , thunderc - > estimated_sent , thunderc - > total_links * sizeof ( int64_t ) ) ;
qsort ( sorted_estimation , thunderc - > total_links , sizeof ( int64_t ) , compare_int64 ) ;
for ( int i = 0 ; i < thunderc - > total_links - 1 ; i + + ) {
if ( sorted_estimation [ i - deleted ] = = sorted_estimation [ i - deleted + 1 ] ) {
for ( int j = i - deleted + 1 ; j < thunderc - > total_links - 1 - deleted ; j + + ) {
sorted_estimation [ j ] = sorted_estimation [ j + 1 ] ;
}
deleted + + ;
}
}
* sorted_occurencies = thunderc - > total_links - deleted ;
}
2019-08-28 12:57:20 +00:00
void adapt ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp , struct unpad_info * ui ) {
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct thunder_ctx * thunderc = app_ctx - > misc ;
char url [ 256 ] ;
struct evt_core_fdinfo * to_fdinfo = NULL ;
2019-08-29 15:53:49 +00:00
uint64_t delivered = 0 ;
2019-08-26 15:48:22 +00:00
2019-09-16 12:40:58 +00:00
int64_t sorted_estimation [ MAX_LINKS ] ;
uint64_t sorted_occurencies ;
get_estimation ( thunderc , sorted_estimation , & sorted_occurencies ) ;
/*
printf ( " %d packets with estimated jitter " , ui - > ap_arr_vals ) ;
for ( int i = 0 ; i < sorted_occurencies ; i + + ) {
printf ( " %ld, " , sorted_estimation [ i ] ) ;
}
printf ( " \n " ) ; */
2019-08-28 16:27:06 +00:00
for ( int i = ui - > ap_arr_vals - 1 ; i > = 0 ; i - - ) {
2019-08-29 09:27:34 +00:00
//fprintf(stderr, "i=%d, ui->ap_arr_vals=%d\n", i, ui->ap_arr_vals);
2019-09-16 12:40:58 +00:00
if ( ui - > ap_arr_meta [ i ] - > fmt . content . udp_metadata_thunder . id < = thunderc - > recv_id ) continue ; // already delivered
2019-08-28 12:57:20 +00:00
thunderc - > recv_id = ui - > ap_arr_meta [ i ] - > fmt . content . udp_metadata_thunder . id ;
2019-09-16 12:40:58 +00:00
int64_t estimation_index = i > sorted_occurencies - 1 ? sorted_occurencies - 1 : i ;
if ( sorted_estimation [ estimation_index ] > thunderc - > allowed_jitter_ms ) {
continue ;
}
2019-08-28 12:57:20 +00:00
// Find destination
sprintf ( url , " udp:write:127.0.0.1:%d " , ui - > ap_arr_pl [ i ] - > fmt . content . udp_encapsulated . port ) ;
to_fdinfo = evt_core_get_from_url ( ctx , url ) ;
if ( to_fdinfo = = NULL ) {
fprintf ( stderr , " No fd for URL %s in tcp-read. Dropping packet :( \n " , url ) ;
}
struct buffer_packet * bp_dest = inject_buffer_tow ( & app_ctx - > br , to_fdinfo ) ;
2019-08-29 09:27:34 +00:00
bp_dest - > mode = BP_WRITING ;
//dump_buffer_packet (bp_dest);
2019-08-28 12:57:20 +00:00
buffer_append_ap ( bp_dest , ui - > ap_arr_pl [ i ] ) ;
main_on_udp_write ( ctx , to_fdinfo ) ;
2019-08-29 15:53:49 +00:00
delivered + + ;
}
if ( delivered ! = 1 ) {
//printf("[algo_thunder] Delivered %ld packets (now id=%d)\n", delivered, thunderc->recv_id);
2019-08-28 12:57:20 +00:00
}
mv_buffer_rtof ( & app_ctx - > br , fdinfo ) ;
2019-08-28 14:33:43 +00:00
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [algo_thunder] Adapt done \n " ) ;
2019-08-26 15:48:22 +00:00
}
int algo_thunder_on_stream ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
2019-08-28 09:35:43 +00:00
struct unpad_info ui = { 0 } ;
2019-08-27 15:28:14 +00:00
classify ( ctx , fdinfo , bp ) ;
2019-08-28 09:35:43 +00:00
unpad ( ctx , fdinfo , bp , & ui ) ;
2019-08-28 12:57:20 +00:00
adapt ( ctx , fdinfo , bp , & ui ) ;
2019-08-09 15:01:28 +00:00
return 0 ;
}
int algo_thunder_on_datagram ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
prepare ( ctx , fdinfo , bp ) ;
pad ( ctx , fdinfo , bp ) ;
2019-08-28 12:57:20 +00:00
schedule ( ctx , fdinfo , bp ) ;
return 0 ;
}
2019-08-28 14:33:43 +00:00
void algo_thunder_free ( void * v ) {
struct rr_ctx * rr = v ;
free ( rr ) ;
}
2019-08-28 12:57:20 +00:00
void algo_thunder_init ( struct evt_core_ctx * ctx , struct algo_ctx * app_ctx , struct algo_params * ap ) {
app_ctx - > misc = malloc ( sizeof ( struct thunder_ctx ) ) ;
2019-08-28 14:33:43 +00:00
app_ctx - > free_misc = algo_thunder_free ;
2019-08-28 12:57:20 +00:00
if ( app_ctx - > misc = = NULL ) {
perror ( " malloc failed in algo thunder init " ) ;
exit ( EXIT_FAILURE ) ;
}
memset ( app_ctx - > misc , 0 , sizeof ( struct thunder_ctx ) ) ;
struct thunder_ctx * thunderc = app_ctx - > misc ;
thunderc - > recv_id = 1 ;
thunderc - > emit_id = 1 ;
thunderc - > total_links = app_ctx - > ap . links ;
2019-08-29 16:11:50 +00:00
thunderc - > selected_link = thunderc - > total_links - 1 ;
2019-09-06 13:28:42 +00:00
thunderc - > allowed_jitter_ms = 200 ;
2019-09-15 07:48:30 +00:00
for ( int i = 0 ; i < thunderc - > total_links ; i + + ) {
thunderc - > received_pkts_on_link [ i ] = 1 ;
thunderc - > rcv_delta_t_per_link [ i ] = 0 ;
}
2019-08-28 12:57:20 +00:00
union abstract_packet links = { } ;
//fprintf(stderr, "Total links %d\n", thunderc->total_links);
thunderc - > monit_pkt_size = sizeof ( links . fmt . headers ) + sizeof ( links . fmt . content . link_monitoring_thunder ) + sizeof ( struct link_info ) * ( thunderc - > total_links - 1 ) ;
2019-08-28 15:49:01 +00:00
2019-09-06 13:28:42 +00:00
if ( ap - > algo_specific_params ! = NULL ) {
sscanf ( ap - > algo_specific_params , " %ld " , & thunderc - > allowed_jitter_ms ) ;
}
printf ( " Allowed jitter set to %ld ms \n " , thunderc - > allowed_jitter_ms ) ;
2019-08-28 15:49:01 +00:00
init_timer ( ctx ) ;
2019-08-09 15:01:28 +00:00
}
int algo_thunder_on_err ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
if ( strstr ( fdinfo - > cat - > name , " udp " ) ! = NULL ) return 1 ;
return 0 ;
}