2019-10-07 14:07:35 +00:00
# include <sys/timerfd.h>
# include "algo_utils.h"
# include "utils.h"
# include "url.h"
# include "proxy.h"
# include "timer.h"
# include "proxy.h"
2019-10-09 15:07:51 +00:00
# include "measure.h"
2019-11-01 14:34:23 +00:00
# define HISTORIC_SIZE 2048
2019-10-07 14:07:35 +00:00
# define MAX_LINKS 64
enum ooo_state {
IN_ORDER ,
OOO_ONGOING ,
OOO_DONE
} ;
2019-10-11 12:45:21 +00:00
char * ooo_state_str [ ] = {
" IN_ORDER " ,
" OOO_ONGOING " ,
" OOO_DONE "
} ;
2019-10-16 13:22:16 +00:00
enum schedule_group_target {
SCHEDULE_BOTH = 0 ,
SCHEDULE_FAST = 1 ,
SCHEDULE_SLOW = 2
} ;
int schedule_group_target_trans [ ] = {
SCHEDULE_BOTH ,
SCHEDULE_SLOW ,
SCHEDULE_FAST
} ;
char * schedule_group_target_str [ ] = {
" SCHEDULE_BOTH " ,
" SCHEDULE_FAST " ,
" SCHEDULE_SLOW "
} ;
2019-10-22 08:22:17 +00:00
enum link_cat {
LINK_FAST = 0 ,
LINK_SLOW = 1 ,
LINK_NOT_USED = 2
} ;
char * link_cat_str [ ] = {
" LINK_FAST " ,
" LINK_SLOW " ,
" LINK_NOT_USED "
} ;
2019-10-08 12:47:35 +00:00
struct stat_entry {
uint8_t link_id ;
2019-10-18 16:08:56 +00:00
int64_t ooo ;
int64_t meas_occ ;
2019-10-08 12:47:35 +00:00
} ;
2019-10-07 14:07:35 +00:00
struct timing_entry {
enum ooo_state state ;
struct timespec detected_at ;
struct timespec finished_at ;
uint8_t link_id ;
uint64_t pkt_id ;
} ;
2019-10-21 16:21:57 +00:00
struct link_status {
struct timespec last ;
2019-10-22 08:22:17 +00:00
enum link_cat used ;
2019-10-21 16:21:57 +00:00
} ;
2019-10-07 14:07:35 +00:00
struct light_ctx {
2019-10-07 16:17:44 +00:00
uint8_t prev_links [ MAX_LINKS ] ;
2019-10-11 12:45:21 +00:00
int16_t remote_stats [ MAX_LINKS ] ;
int16_t local_stats [ MAX_LINKS ] ;
2019-10-07 14:07:35 +00:00
struct timing_entry historic [ HISTORIC_SIZE ] ;
2019-10-21 16:21:57 +00:00
struct link_status status [ MAX_LINKS ] ;
2020-02-04 15:09:26 +00:00
uint8_t active ;
2019-10-07 14:07:35 +00:00
uint64_t pkt_rcv_id ;
uint64_t pkt_sent_id ;
2019-10-16 09:24:41 +00:00
uint64_t uniq_pkt_sent_id ;
2019-10-07 14:07:35 +00:00
uint8_t selected_link ;
uint8_t total_links ;
2019-10-08 12:47:35 +00:00
int fast_count ;
2019-10-07 16:17:44 +00:00
int sent_past_links ;
2019-10-07 14:07:35 +00:00
struct timespec window ;
2019-10-21 16:21:57 +00:00
struct timespec last_update_used ;
2019-10-07 14:07:35 +00:00
size_t monit_pkt_size ;
2019-10-16 14:45:10 +00:00
int csv ;
int is_measlat ;
int explain ;
int disable_scheduler ;
2019-10-22 08:22:17 +00:00
struct stat_entry stats [ MAX_LINKS ] ;
2019-10-16 13:22:16 +00:00
enum schedule_group_target sched_strat ;
2019-10-07 14:07:35 +00:00
} ;
void algo_lightning_free ( void * v ) {
struct light_ctx * lightc = v ;
free ( lightc ) ;
}
void algo_lightning_init ( struct evt_core_ctx * ctx , struct algo_ctx * app_ctx , struct algo_params * ap ) {
app_ctx - > misc = malloc ( sizeof ( struct light_ctx ) ) ;
app_ctx - > free_misc = algo_lightning_free ;
if ( app_ctx - > misc = = NULL ) {
perror ( " malloc failed in algo lightning init " ) ;
exit ( EXIT_FAILURE ) ;
}
memset ( app_ctx - > misc , 0 , sizeof ( struct light_ctx ) ) ;
struct light_ctx * lightc = app_ctx - > misc ;
lightc - > total_links = app_ctx - > ap . links ;
lightc - > selected_link = lightc - > total_links - 1 ;
2019-10-23 12:55:24 +00:00
lightc - > sent_past_links = lightc - > total_links / 2 ;
2019-10-21 16:21:57 +00:00
lightc - > fast_count = lightc - > total_links / 4 ;
2019-10-08 14:48:00 +00:00
lightc - > csv = 0 ;
2019-10-11 14:00:56 +00:00
lightc - > explain = 0 ;
2019-10-16 09:24:41 +00:00
lightc - > pkt_sent_id = 1 ;
lightc - > uniq_pkt_sent_id = 1 ;
2019-10-16 12:39:54 +00:00
lightc - > disable_scheduler = 0 ;
2020-02-04 15:09:26 +00:00
lightc - > active = 0 ;
2019-10-16 13:22:16 +00:00
lightc - > sched_strat = SCHEDULE_BOTH ;
2019-10-07 14:07:35 +00:00
2019-10-08 12:47:35 +00:00
uint64_t window = 2000 ;
2019-10-07 14:07:35 +00:00
if ( ap - > algo_specific_params ! = NULL ) {
char * parse_ptr , * token , * params ;
for ( params = ap - > algo_specific_params ; ; params = NULL ) {
2019-10-17 09:00:18 +00:00
token = strtok_r ( params , " ! " , & parse_ptr ) ;
2019-10-07 14:07:35 +00:00
if ( token = = NULL ) break ;
2019-10-08 12:47:35 +00:00
sscanf ( token , " fast_count=%d " , & lightc - > fast_count ) ;
2019-10-07 14:07:35 +00:00
sscanf ( token , " window=%ld " , & window ) ;
2019-10-07 16:17:44 +00:00
sscanf ( token , " sent_past_links=%d " , & lightc - > sent_past_links ) ;
2019-10-11 12:45:21 +00:00
sscanf ( token , " csv=%d " , & lightc - > csv ) ;
2019-10-11 14:00:56 +00:00
sscanf ( token , " measlat=%d " , & lightc - > is_measlat ) ;
sscanf ( token , " explain=%d " , & lightc - > explain ) ;
2019-10-16 12:39:54 +00:00
sscanf ( token , " disable_scheduler=%d " , & lightc - > disable_scheduler ) ;
2019-10-16 13:22:16 +00:00
sscanf ( token , " tick_tock=%d " , & lightc - > sched_strat ) ;
2019-10-07 14:07:35 +00:00
}
}
2019-10-08 08:54:27 +00:00
for ( int i = 0 ; i < lightc - > sent_past_links ; i + + )
lightc - > prev_links [ i ] = UINT8_MAX ;
2019-10-22 08:22:17 +00:00
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
2020-02-04 15:09:26 +00:00
lightc - > status [ i ] . used = LINK_NOT_USED ;
2019-10-21 16:21:57 +00:00
}
2019-10-07 14:07:35 +00:00
union abstract_packet m ;
2019-10-11 09:01:09 +00:00
lightc - > monit_pkt_size =
sizeof ( m . fmt . headers ) +
sizeof ( m . fmt . content . link_monitoring_lightning ) +
sizeof ( uint8_t ) * ( lightc - > sent_past_links - 1 ) +
2019-10-11 12:45:21 +00:00
sizeof ( int16_t ) * lightc - > total_links ;
2019-10-08 08:22:14 +00:00
timespec_set_unit ( & lightc - > window , window , MILISEC ) ;
2019-10-07 14:07:35 +00:00
2019-10-08 12:47:35 +00:00
printf ( " fast_count = %d \n " , lightc - > fast_count ) ;
2019-10-07 16:17:44 +00:00
printf ( " window check = %ld ms \n " , window ) ;
printf ( " sent_past_links = %d \n " , lightc - > sent_past_links ) ;
2019-10-09 15:07:51 +00:00
printf ( " csv = %s \n " , lightc - > csv ? " yes " : " no " ) ;
printf ( " measlat = %s \n " , lightc - > is_measlat ? " yes " : " no " ) ;
2019-10-11 14:00:56 +00:00
printf ( " explain = %s \n " , lightc - > explain ? " yes " : " no " ) ;
2019-10-16 12:39:54 +00:00
printf ( " disable_scheduler = %s \n " , lightc - > disable_scheduler ? " yes " : " no " ) ;
2019-10-16 13:22:16 +00:00
printf ( " schedule_group_target = %s \n " , schedule_group_target_str [ lightc - > sched_strat ] ) ;
2019-10-07 14:07:35 +00:00
}
2019-10-15 16:38:51 +00:00
void algo_lightning_pad ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct light_ctx * lightc = app_ctx - > misc ;
2019-10-16 09:24:41 +00:00
uint64_t ref = lightc - > uniq_pkt_sent_id ;
2019-10-15 16:38:51 +00:00
// 0. Store current buffer to application
2019-10-16 12:33:55 +00:00
//fprintf(stderr, " [algo_lightning] Store buffer with pointer %p\n", (void*) ref);
2019-10-15 16:38:51 +00:00
dup_buffer_toa ( & app_ctx - > br , bp , ( void * ) ref ) ;
// 1. Clean old buffers (we keep only thunderc->total_links buffer, keeping more would be useless)
2019-10-16 12:33:55 +00:00
//fprintf(stderr, " [algo_lightning] Clean queue\n");
2019-10-15 16:38:51 +00:00
if ( ref > lightc - > total_links & & get_app_buffer ( & app_ctx - > br , ( void * ) ( ref - lightc - > total_links ) ) ) {
mv_buffer_atof ( & app_ctx - > br , ( void * ) ( ref - lightc - > total_links ) ) ;
}
// 2. Append abstract packets stored in our buffers
uint64_t add_ref = ref ;
while ( 1 ) {
2019-10-16 12:33:55 +00:00
//fprintf(stderr, " [algo_lightning] Enter loop with ref %ld\n", add_ref);
2019-10-16 09:24:41 +00:00
if ( add_ref < 1 ) {
2019-10-16 12:33:55 +00:00
//fprintf(stderr, " [algo_lightning] add_ref=%ld < 1\n", add_ref);
2019-10-16 09:24:41 +00:00
break ;
}
2019-10-15 16:38:51 +00:00
add_ref - - ;
struct buffer_packet * bp_iter = get_app_buffer ( & app_ctx - > br , ( void * ) add_ref ) ;
2019-10-16 09:24:41 +00:00
if ( bp_iter = = NULL ) {
2019-10-16 12:33:55 +00:00
//fprintf(stderr, " [algo_lightning] bp_iter=%p == NULL\n", bp_iter);
2019-10-16 09:24:41 +00:00
break ;
}
2019-10-15 16:38:51 +00:00
union abstract_packet * ap = buffer_first_ap ( bp_iter ) ;
if ( ap - > fmt . headers . cmd ! = CMD_UDP_ENCAPSULATED ) {
2019-10-16 12:33:55 +00:00
//fprintf(stderr, "Invalid buffer payload!\n");
2019-10-15 16:38:51 +00:00
exit ( EXIT_FAILURE ) ;
}
2019-10-16 12:33:55 +00:00
//fprintf(stderr, " [algo_lightning] Currently %ld bytes, would be %ld\n", buffer_full_size (bp), buffer_full_size (bp) + ap->fmt.headers.size);
2019-10-15 16:38:51 +00:00
if ( buffer_full_size ( bp ) + ap - > fmt . headers . size > TOR_CELL_SIZE - lightc - > monit_pkt_size ) break ;
buffer_append_ap ( bp , ap ) ;
2019-10-16 12:33:55 +00:00
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [algo_lightning] Pad packet (now %ld bytes) \n " , buffer_full_size ( bp ) ) ;
2019-10-15 16:38:51 +00:00
}
}
2019-10-07 16:17:44 +00:00
void monitoring ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct light_ctx * lightc = app_ctx - > misc ;
union abstract_packet * ap = ( union abstract_packet * ) & bp - > ip ;
while ( ap ! = NULL & & ap - > fmt . headers . cmd ! = CMD_LINK_MONITORING_LIGHTNING ) ap = ap_next ( ap ) ;
if ( ap = = NULL ) {
fprintf ( stderr , " [algo_lightning] Unable to find our monitoring information \n " ) ;
exit ( EXIT_FAILURE ) ;
}
2019-10-11 09:01:09 +00:00
uint8_t * prev_links = & ap - > fmt . content . link_monitoring_lightning . dyn_struct ;
2019-10-11 12:45:21 +00:00
int16_t * remote_stats = ( int16_t * ) ( prev_links + sizeof ( uint8_t ) * lightc - > sent_past_links ) ;
2019-10-11 14:00:56 +00:00
if ( lightc - > explain ) {
printf ( " (monitoring.stats) " ) ;
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
printf ( " %d, " , remote_stats [ i ] ) ;
}
printf ( " \n " ) ;
2019-10-11 12:45:21 +00:00
}
2019-10-11 09:01:09 +00:00
2019-10-07 16:17:44 +00:00
int64_t pkt_id = ap - > fmt . content . link_monitoring_lightning . id ;
int64_t missing = pkt_id - ( lightc - > pkt_rcv_id + 1 ) ;
2019-10-11 09:01:09 +00:00
if ( pkt_id > lightc - > pkt_rcv_id ) {
lightc - > pkt_rcv_id = pkt_id ;
2019-10-11 12:45:21 +00:00
memcpy ( & lightc - > remote_stats , remote_stats , sizeof ( int16_t ) * lightc - > total_links ) ;
2019-10-11 09:01:09 +00:00
}
//printf("internal packet %ld (%ld)\n", pkt_id, missing);
2019-10-07 16:17:44 +00:00
struct timespec now ;
set_now ( & now ) ;
// Detect OoO
for ( int i = 0 ; i < missing & & i < lightc - > sent_past_links ; i + + ) {
uint8_t link_id = prev_links [ i ] ;
int64_t miss_id = pkt_id - ( i + 1 ) ;
struct timing_entry * te = & lightc - > historic [ miss_id % HISTORIC_SIZE ] ;
2019-10-31 17:05:47 +00:00
if ( te - > pkt_id > = miss_id ) continue ; // Entry already exists
2019-10-07 16:17:44 +00:00
te - > state = OOO_ONGOING ;
te - > detected_at = now ;
te - > link_id = link_id ;
te - > pkt_id = miss_id ;
2019-10-11 14:00:56 +00:00
if ( lightc - > explain ) printf ( " (monitoring.delay) packet=%ld, link=%d, state=%s \n " , miss_id , link_id , ooo_state_str [ te - > state ] ) ;
2019-10-07 16:17:44 +00:00
}
// Update current packet status
int link_id = url_get_port_int ( fdinfo - > url ) - 7500 ;
struct timing_entry * te2 = & lightc - > historic [ pkt_id % HISTORIC_SIZE ] ;
te2 - > state = te2 - > pkt_id = = pkt_id ? OOO_DONE : IN_ORDER ;
te2 - > pkt_id = pkt_id ;
te2 - > link_id = link_id ;
te2 - > finished_at = now ;
2019-10-11 14:00:56 +00:00
if ( lightc - > explain ) printf ( " (monitoring.rcv) packet=%ld, link=%d, state=%s \n " , pkt_id , link_id , ooo_state_str [ te2 - > state ] ) ;
2019-10-07 16:17:44 +00:00
}
int deliver ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
char url [ 256 ] ;
struct evt_core_fdinfo * to_fdinfo = NULL ;
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
2019-10-07 14:07:35 +00:00
union abstract_packet * ap = ( union abstract_packet * ) & bp - > ip ;
2019-10-07 16:17:44 +00:00
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [algo_lightning] 1/2 Find destination \n " ) ;
sprintf ( url , " udp:write:127.0.0.1:%d " , ap - > fmt . content . udp_encapsulated . port ) ;
to_fdinfo = evt_core_get_from_url ( ctx , url ) ;
if ( to_fdinfo = = NULL ) {
fprintf ( stderr , " No fd for URL %s in tcp-read. Dropping packet :( \n " , url ) ;
mv_buffer_wtof ( & app_ctx - > br , fdinfo ) ;
return 1 ;
}
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [algo_lightning] 2/2 Move buffer \n " ) ;
mv_buffer_rtow ( & app_ctx - > br , fdinfo , to_fdinfo ) ;
main_on_udp_write ( ctx , to_fdinfo ) ;
2019-10-07 14:07:35 +00:00
return 0 ;
}
2019-10-07 16:17:44 +00:00
int algo_lightning_on_stream ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
monitoring ( ctx , fdinfo , bp ) ;
return deliver ( ctx , fdinfo , bp ) ;
}
2019-10-08 12:47:35 +00:00
int compare_stat_entry_max ( const void * a , const void * b ) {
const struct stat_entry * sea = a , * seb = b ;
2020-02-04 15:09:26 +00:00
int ra = sea - > ooo ;
int rb = seb - > ooo ;
if ( ra < 0 ) ra = INT16_MAX + - ra ;
if ( rb < 0 ) rb = INT16_MAX + - rb ;
return ra - rb ;
2019-10-08 12:47:35 +00:00
}
2020-02-04 13:10:27 +00:00
void algo_lightning_update_stats ( struct light_ctx * lightc , struct evt_core_ctx * ctx ) {
2019-10-07 14:07:35 +00:00
struct timespec now , not_before = { 0 } , temp_time ;
set_now ( & now ) ;
timespec_diff ( & now , & lightc - > window , & not_before ) ;
2019-10-11 09:01:09 +00:00
// Init
2019-10-08 12:47:35 +00:00
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
2019-10-22 08:22:17 +00:00
lightc - > stats [ i ] . link_id = i ;
lightc - > stats [ i ] . meas_occ = 0 ;
lightc - > stats [ i ] . ooo = - 1 ;
2019-10-08 12:47:35 +00:00
}
2019-10-07 14:07:35 +00:00
2019-10-11 09:01:09 +00:00
// Compute local stats
2019-10-07 14:07:35 +00:00
for ( int i = 0 ; i < HISTORIC_SIZE ; i + + ) {
2019-10-11 12:45:21 +00:00
if ( timespec_lt ( & lightc - > historic [ i ] . finished_at , & not_before ) & & lightc - > historic [ i ] . state ! = OOO_ONGOING ) continue ;
2019-10-08 08:54:27 +00:00
uint8_t l = lightc - > historic [ i ] . link_id ;
if ( l > = lightc - > total_links ) continue ;
2019-10-07 14:07:35 +00:00
int64_t delta = 0 ;
switch ( lightc - > historic [ i ] . state ) {
case IN_ORDER :
2020-02-04 10:44:20 +00:00
delta = 0 ;
2019-10-07 14:07:35 +00:00
break ;
case OOO_ONGOING :
timespec_diff ( & now , & lightc - > historic [ i ] . detected_at , & temp_time ) ;
2019-10-08 08:22:14 +00:00
delta = timespec_get_unit ( & temp_time , MILISEC ) ;
2019-10-07 14:07:35 +00:00
break ;
case OOO_DONE :
timespec_diff ( & lightc - > historic [ i ] . finished_at , & lightc - > historic [ i ] . detected_at , & temp_time ) ;
2019-10-08 08:22:14 +00:00
delta = timespec_get_unit ( & temp_time , MILISEC ) ;
2019-10-07 14:07:35 +00:00
break ;
}
2020-02-04 10:44:20 +00:00
lightc - > stats [ l ] . ooo + = delta ;
lightc - > stats [ l ] . meas_occ + = 1 ;
2019-10-15 12:22:45 +00:00
if ( lightc - > explain ) printf ( " (stats.compute) packet=%ld, link=%d, status=%s, delta=%ld \n " , lightc - > historic [ i ] . pkt_id , l , ooo_state_str [ lightc - > historic [ i ] . state ] , delta ) ;
2019-10-22 08:22:17 +00:00
lightc - > stats [ l ] . link_id = l ;
2019-10-18 16:08:56 +00:00
if ( lightc - > explain ) printf ( " (stats.local) link=%d, delta=%ld \n " , l , delta ) ;
/*if (delta > stats[l].ooo) {
2019-10-15 12:22:45 +00:00
if ( lightc - > explain ) printf ( " (stats.local) link=%d, delta=%ld \n " , l , delta ) ;
2019-10-18 16:08:56 +00:00
stats [ l ] . ooo = delta ;
} */
}
// Compute average
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
2020-02-04 13:10:27 +00:00
if ( lightc - > stats [ i ] . meas_occ < = 0 ) lightc - > stats [ i ] . ooo = - 1 ;
else lightc - > stats [ i ] . ooo = lightc - > stats [ i ] . ooo / lightc - > stats [ i ] . meas_occ ;
2020-02-04 15:09:26 +00:00
lightc - > local_stats [ i ] = lightc - > stats [ i ] . ooo ;
2019-10-07 14:07:35 +00:00
}
2019-10-08 12:47:35 +00:00
2019-10-11 09:01:09 +00:00
// Set my local stats + merge remote stats
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
2019-10-22 09:26:13 +00:00
/* AVG */
2020-02-04 15:09:26 +00:00
if ( lightc - > remote_stats [ i ] < 0 ) continue ;
if ( lightc - > stats [ i ] . ooo < 0 ) lightc - > stats [ i ] . ooo = lightc - > remote_stats [ i ] ;
2019-10-22 08:22:17 +00:00
else lightc - > stats [ i ] . ooo = ( lightc - > remote_stats [ i ] + lightc - > stats [ i ] . ooo ) / 2 ;
2019-10-22 09:26:13 +00:00
/* MAX
if ( lightc - > remote_stats [ i ] > lightc - > stats [ i ] . ooo ) {
2019-10-15 12:22:45 +00:00
if ( lightc - > explain ) printf ( " (stats.remote) link=%d, delta=%d \n " , i , lightc - > remote_stats [ i ] ) ;
2019-10-22 09:26:13 +00:00
lightc - > stats [ i ] . ooo = lightc - > remote_stats [ i ] ;
} */
2019-10-11 09:01:09 +00:00
}
2020-02-04 13:10:27 +00:00
// Disable broken links
char url [ 256 ] ;
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
sprintf ( url , " tcp:write:127.0.0.1:%d " , 7500 + i ) ;
struct evt_core_fdinfo * to_fdinfo = evt_core_get_from_url ( ctx , url ) ;
2020-02-04 15:09:26 +00:00
if ( to_fdinfo = = NULL ) lightc - > stats [ i ] . ooo = - 2 ;
2020-02-04 13:10:27 +00:00
}
2019-10-11 09:01:09 +00:00
// Sort
2019-10-16 14:45:10 +00:00
if ( ! lightc - > disable_scheduler ) {
2019-10-22 08:22:17 +00:00
qsort ( lightc - > stats , lightc - > total_links , sizeof ( struct stat_entry ) , compare_stat_entry_max ) ;
2019-10-16 14:45:10 +00:00
}
2019-10-07 14:07:35 +00:00
}
int send_message ( struct evt_core_ctx * ctx , struct buffer_packet * bp ) {
char url [ 256 ] ;
struct evt_core_cat * cat = evt_core_get_from_cat ( ctx , " tcp-write " ) ;
if ( cat = = NULL ) {
fprintf ( stderr , " [algo_lightning] cat tcp-write not found \n " ) ;
exit ( EXIT_FAILURE ) ;
}
struct algo_ctx * app_ctx = cat - > app_ctx ;
struct light_ctx * lightc = app_ctx - > misc ;
2020-02-04 10:44:20 +00:00
if ( lightc - > selected_link > = lightc - > total_links ) {
fprintf ( stderr , " [algo_lightning] PACKET DROPPED! Selected link id %d is greater than the total number of links %d \n " , lightc - > selected_link , lightc - > total_links ) ;
return 0 ;
}
2019-10-21 16:21:57 +00:00
set_now ( & lightc - > status [ lightc - > selected_link ] . last ) ;
2019-10-07 14:07:35 +00:00
sprintf ( url , " tcp:write:127.0.0.1:%d " , 7500 + lightc - > selected_link ) ;
struct evt_core_fdinfo * to_fdinfo = evt_core_get_from_url ( ctx , url ) ;
2020-02-04 10:44:20 +00:00
if ( to_fdinfo = = NULL ) {
fprintf ( stderr , " [algo_lightning] PACKET DROPPED! We don't have any entry for %s currently \n " , url ) ;
return 0 ;
}
2019-10-07 14:07:35 +00:00
struct buffer_packet * bp_dup = dup_buffer_tow ( & app_ctx - > br , bp , to_fdinfo ) ;
union abstract_packet monit = {
. fmt . headers . cmd = CMD_LINK_MONITORING_LIGHTNING ,
. fmt . headers . size = lightc - > monit_pkt_size ,
. fmt . headers . flags = 0 ,
2019-10-07 16:17:44 +00:00
. fmt . content . link_monitoring_lightning . id = lightc - > pkt_sent_id
2019-10-07 14:07:35 +00:00
} ;
2019-10-07 16:17:44 +00:00
union abstract_packet * ap_buf = buffer_append_ap ( bp_dup , & monit ) ;
2019-10-11 09:01:09 +00:00
uint8_t * links = & ap_buf - > fmt . content . link_monitoring_lightning . dyn_struct ;
2019-10-11 12:45:21 +00:00
int16_t * remote_stats = ( int16_t * ) ( links + sizeof ( uint8_t ) * lightc - > sent_past_links ) ;
2019-10-11 09:01:09 +00:00
2019-10-07 16:17:44 +00:00
for ( int i = 0 ; i < lightc - > sent_past_links ; i + + ) {
2019-10-08 08:10:59 +00:00
links [ i ] = lightc - > prev_links [ ( lightc - > pkt_sent_id - ( i + 1 ) ) % MAX_LINKS ] ;
2019-10-07 16:17:44 +00:00
}
2019-10-11 12:45:21 +00:00
memcpy ( remote_stats , & lightc - > local_stats , sizeof ( int16_t ) * lightc - > total_links ) ;
2019-10-15 12:22:45 +00:00
if ( lightc - > explain ) {
printf ( " (send.stats) " ) ;
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
printf ( " %d, " , remote_stats [ i ] ) ;
}
printf ( " \n " ) ;
2019-10-11 12:45:21 +00:00
}
2019-10-07 16:17:44 +00:00
lightc - > prev_links [ lightc - > pkt_sent_id % MAX_LINKS ] = lightc - > selected_link ;
2019-10-16 09:24:41 +00:00
lightc - > pkt_sent_id + + ;
2019-10-07 14:07:35 +00:00
if ( ctx - > verbose > 1 ) {
dump_buffer_packet ( bp_dup ) ;
fprintf ( stderr , " [algo_lightning] Will send this info \n " ) ;
}
main_on_tcp_write ( ctx , to_fdinfo ) ;
return 1 ;
}
2019-10-18 13:05:02 +00:00
void tag_packet_measlat ( union abstract_packet * ap , uint8_t link_id , uint8_t is_slow ) {
2019-10-18 15:29:49 +00:00
union abstract_packet * cur = ap ;
uint8_t vanilla = 1 ;
2019-10-18 13:05:02 +00:00
2019-10-18 15:29:49 +00:00
while ( cur ! = NULL ) {
if ( ap - > fmt . headers . cmd ! = CMD_UDP_ENCAPSULATED ) {
cur = ap_next ( cur ) ;
continue ;
}
struct measure_packet * mp = ( void * ) & cur - > fmt . content . udp_encapsulated . payload ;
2019-10-18 15:31:42 +00:00
mp - > flag = 0x3f & link_id ;
2019-10-18 15:29:49 +00:00
mp - > flag | = vanilla < < 6 ;
mp - > flag | = is_slow < < 7 ;
vanilla = 0 ;
cur = ap_next ( cur ) ;
}
2019-10-18 13:05:02 +00:00
}
2019-10-22 08:22:17 +00:00
void algo_lightning_update_used ( struct light_ctx * lightc , struct timespec * now ) {
2019-10-21 16:21:57 +00:00
struct timespec not_before = { 0 } , oldest = * now ;
timespec_diff ( now , & lightc - > window , & not_before ) ;
if ( timespec_gt ( & lightc - > last_update_used , & not_before ) ) return ;
2020-02-04 15:09:26 +00:00
int used_to_not = - 1 , not_to_used = - 1 ;
2019-10-21 16:21:57 +00:00
int64_t max_ooo = 0 ;
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
2019-10-22 08:22:17 +00:00
if ( lightc - > status [ lightc - > stats [ i ] . link_id ] . used = = LINK_FAST | | lightc - > status [ lightc - > stats [ i ] . link_id ] . used = = LINK_SLOW ) {
2020-02-04 15:09:26 +00:00
int64_t retained_ooo = lightc - > stats [ i ] . ooo < 0 ? INT64_MAX : lightc - > stats [ i ] . ooo ;
2019-10-22 08:22:17 +00:00
if ( retained_ooo > = max_ooo ) {
max_ooo = retained_ooo ;
used_to_not = lightc - > stats [ i ] . link_id ;
2019-10-21 16:21:57 +00:00
}
} else {
2020-02-04 15:09:26 +00:00
if ( lightc - > stats [ i ] . ooo = = - 2 ) continue ;
2019-10-22 08:22:17 +00:00
if ( timespec_lt ( & lightc - > status [ lightc - > stats [ i ] . link_id ] . last , & oldest ) ) {
oldest = lightc - > status [ lightc - > stats [ i ] . link_id ] . last ;
not_to_used = lightc - > stats [ i ] . link_id ;
2019-10-21 16:21:57 +00:00
}
}
}
// Swap them
2019-10-22 09:09:36 +00:00
//printf("Link %d will be disabled, %d will be enabled\n", used_to_not, not_to_used);
2020-02-04 15:09:26 +00:00
if ( used_to_not < 0 | | not_to_used < 0 ) return ;
2019-10-22 08:22:17 +00:00
lightc - > status [ used_to_not ] . used = LINK_NOT_USED ;
lightc - > status [ not_to_used ] . used = LINK_SLOW ;
2019-10-21 16:21:57 +00:00
lightc - > last_update_used = * now ;
}
2020-02-04 15:09:26 +00:00
void algo_lightning_link_cat ( struct light_ctx * lightc , int cur_fast_count ) {
2019-10-22 08:22:17 +00:00
uint8_t used = 0 ;
2019-10-22 09:09:36 +00:00
//printf("---\n");
2019-10-22 08:22:17 +00:00
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
if ( lightc - > status [ lightc - > stats [ i ] . link_id ] . used ! = LINK_NOT_USED ) {
2020-02-04 15:09:26 +00:00
if ( used < cur_fast_count ) lightc - > status [ lightc - > stats [ i ] . link_id ] . used = LINK_FAST ;
2019-10-22 08:22:17 +00:00
else lightc - > status [ lightc - > stats [ i ] . link_id ] . used = LINK_SLOW ;
used + + ;
}
2019-10-22 09:09:36 +00:00
//printf("Link ID=%d, status=%s, ooo=%ld\n", lightc->stats[i].link_id, link_cat_str[lightc->status[lightc->stats[i].link_id].used], lightc->stats[i].ooo);
2019-10-22 08:22:17 +00:00
}
}
2019-10-07 14:07:35 +00:00
int algo_lightning_on_datagram ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct light_ctx * lightc = app_ctx - > misc ;
union abstract_packet * ap = ( union abstract_packet * ) & bp - > ip ;
2019-10-21 16:21:57 +00:00
struct timespec now , sel_link_last ;
set_now ( & now ) ;
2019-10-07 14:07:35 +00:00
2019-10-15 16:38:51 +00:00
// Pad packet
algo_lightning_pad ( ctx , fdinfo , bp ) ;
2020-02-04 15:09:26 +00:00
// Prepare links
2020-02-04 13:10:27 +00:00
algo_lightning_update_stats ( lightc , ctx ) ;
2020-02-04 15:09:26 +00:00
// Adapt tags quantity to active links
struct evt_core_cat * cat = evt_core_get_from_cat ( ctx , " tcp-write " ) ;
int target_to_use = lightc - > fast_count * 2 < cat - > socklist - > len ? lightc - > fast_count * 2 : cat - > socklist - > len ;
int diff = target_to_use - ( ( int ) lightc - > active ) ;
for ( int i = 0 ; i < lightc - > total_links & & diff > 0 ; i + + ) {
if ( lightc - > status [ lightc - > stats [ i ] . link_id ] . used ! = LINK_NOT_USED ) continue ;
lightc - > status [ lightc - > stats [ i ] . link_id ] . used = LINK_SLOW ;
diff - - ;
}
for ( int i = lightc - > total_links - 1 ; i > = 0 & & diff < 0 ; i - - ) {
if ( lightc - > status [ lightc - > stats [ i ] . link_id ] . used = = LINK_NOT_USED ) continue ;
lightc - > status [ lightc - > stats [ i ] . link_id ] . used = LINK_NOT_USED ;
diff + + ;
}
lightc - > active = target_to_use ;
// Update link tags
2019-10-22 08:22:17 +00:00
algo_lightning_update_used ( lightc , & now ) ;
2020-02-04 15:09:26 +00:00
algo_lightning_link_cat ( lightc , target_to_use / 2 ) ;
2019-10-07 14:07:35 +00:00
2020-02-04 15:53:30 +00:00
if ( ctx - > verbose > 1 ) {
2020-02-04 15:09:26 +00:00
printf ( " link ranking (%d fast links, %d total links) \n position | port | score | class \n " , target_to_use / 2 , target_to_use ) ;
2019-10-08 12:47:35 +00:00
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
2020-02-04 15:09:26 +00:00
printf ( " %8d | %4d | %9ld | %9s \n " , i , lightc - > stats [ i ] . link_id + 7500 , lightc - > stats [ i ] . ooo , link_cat_str [ lightc - > status [ lightc - > stats [ i ] . link_id ] . used ] ) ;
2019-10-08 12:47:35 +00:00
}
printf ( " \n " ) ;
2019-10-08 08:22:14 +00:00
}
2019-10-08 14:48:00 +00:00
uint64_t now_timestamp = timespec_get_unit ( & now , MILISEC ) ;
2019-10-07 14:07:35 +00:00
2019-10-08 12:47:35 +00:00
// Select fast link
2019-10-16 13:22:16 +00:00
if ( lightc - > sched_strat = = SCHEDULE_BOTH | | lightc - > sched_strat = = SCHEDULE_FAST ) {
sel_link_last = now ;
lightc - > selected_link = UINT8_MAX ;
2020-02-04 15:09:26 +00:00
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
2019-10-22 08:22:17 +00:00
if ( lightc - > status [ lightc - > stats [ i ] . link_id ] . used ! = LINK_FAST ) continue ;
if ( timespec_lt ( & lightc - > status [ lightc - > stats [ i ] . link_id ] . last , & sel_link_last ) ) {
lightc - > selected_link = lightc - > stats [ i ] . link_id ;
sel_link_last = lightc - > status [ lightc - > stats [ i ] . link_id ] . last ;
2019-10-16 13:22:16 +00:00
}
2019-10-07 14:07:35 +00:00
}
2019-10-18 13:05:02 +00:00
if ( lightc - > is_measlat ) tag_packet_measlat ( ap , lightc - > selected_link , 0 ) ;
2019-10-16 13:22:16 +00:00
send_message ( ctx , bp ) ;
if ( lightc - > csv ) printf ( " %ld,%d,fast \n " , now_timestamp , lightc - > selected_link ) ;
2019-10-07 14:07:35 +00:00
}
2019-10-16 13:22:16 +00:00
2019-10-21 16:21:57 +00:00
// Select slow link
2019-10-16 13:22:16 +00:00
if ( lightc - > sched_strat = = SCHEDULE_BOTH | | lightc - > sched_strat = = SCHEDULE_SLOW ) {
sel_link_last = now ;
lightc - > selected_link = UINT8_MAX ;
2019-10-22 08:22:17 +00:00
for ( int i = 0 ; i < lightc - > total_links ; i + + ) {
if ( lightc - > status [ lightc - > stats [ i ] . link_id ] . used ! = LINK_SLOW ) continue ;
if ( timespec_lt ( & lightc - > status [ lightc - > stats [ i ] . link_id ] . last , & sel_link_last ) ) {
lightc - > selected_link = lightc - > stats [ i ] . link_id ;
sel_link_last = lightc - > status [ lightc - > stats [ i ] . link_id ] . last ;
2019-10-16 13:22:16 +00:00
}
2019-10-16 12:39:54 +00:00
}
2019-10-18 13:05:02 +00:00
if ( lightc - > is_measlat ) tag_packet_measlat ( ap , lightc - > selected_link , 1 ) ;
2019-10-16 13:22:16 +00:00
send_message ( ctx , bp ) ;
if ( lightc - > csv ) printf ( " %ld,%d,slow \n " , now_timestamp , lightc - > selected_link ) ;
2019-10-08 14:48:00 +00:00
}
2019-10-07 14:07:35 +00:00
2019-10-16 13:22:16 +00:00
// Update our algo context
lightc - > sched_strat = schedule_group_target_trans [ lightc - > sched_strat ] ;
2019-10-16 09:24:41 +00:00
lightc - > uniq_pkt_sent_id + + ;
2019-10-07 14:07:35 +00:00
mv_buffer_rtof ( & app_ctx - > br , fdinfo ) ;
return 0 ;
}
int algo_lightning_on_err ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
2020-01-20 15:35:52 +00:00
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
if ( strcmp ( " tcp-read " , fdinfo - > cat - > name ) = = 0 | | strcmp ( " tcp-write " , fdinfo - > cat - > name ) = = 0 )
return app_ctx - > ap . sr ( ctx , fdinfo ) ;
fprintf ( stderr , " %s is not eligible for a reconnect \n " , fdinfo - > url ) ;
// We do nothing
return 1 ;
2019-10-07 14:07:35 +00:00
}