2019-03-13 16:53:46 +00:00
# include <sys/timerfd.h>
2019-03-07 15:57:02 +00:00
# include "algo_skel.h"
# include "algo_utils.h"
2019-03-18 16:58:40 +00:00
# include "utils.h"
2019-03-18 09:26:02 +00:00
2019-03-19 14:39:05 +00:00
struct waited_pkt {
uint16_t id ;
int link_num ;
uint8_t on ;
2019-03-19 14:54:31 +00:00
int timer_fd ;
2019-03-19 14:39:05 +00:00
} ;
2019-03-19 12:50:38 +00:00
2019-03-18 09:26:02 +00:00
struct deferred_pkt {
int link_fd ;
2019-03-20 14:13:16 +00:00
int idx ;
2019-03-27 17:27:44 +00:00
uint16_t id ;
2019-03-20 14:13:16 +00:00
uint8_t on ;
2019-03-18 09:26:02 +00:00
} ;
struct rr_ctx {
uint8_t my_links ;
uint16_t my_links_ver ;
uint8_t remote_links ;
2019-03-19 17:50:56 +00:00
int64_t mjit ;
2019-03-18 09:26:02 +00:00
uint16_t recv_id ;
2019-03-27 15:24:39 +00:00
uint16_t recv_id_late ;
2019-03-18 09:26:02 +00:00
uint16_t sent_id ;
2019-03-18 16:58:40 +00:00
uint8_t current_link ;
2019-03-19 15:35:11 +00:00
struct timespec emit_time ;
2019-03-19 14:39:05 +00:00
struct deferred_pkt real [ PACKET_BUFFER_SIZE ] ;
struct waited_pkt wait [ PACKET_BUFFER_SIZE ] ;
2019-03-18 09:26:02 +00:00
} ;
2019-03-18 16:58:40 +00:00
int rr_on_tcp_read ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) ;
int rr_on_tcp_write ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) ;
int rr_on_udp_read ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) ;
int rr_on_udp_write ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) ;
2019-03-18 09:26:02 +00:00
2019-03-13 16:53:46 +00:00
int rr_on_tcp_co ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
int conn_sock1 , conn_sock2 ;
struct sockaddr_in addr ;
socklen_t in_len ;
char url [ 1024 ] , port [ 6 ] ;
struct evt_core_cat local_cat = { 0 } ;
struct evt_core_fdinfo to_fdinfo = { 0 } ;
to_fdinfo . cat = & local_cat ;
to_fdinfo . url = url ;
in_len = sizeof ( addr ) ;
conn_sock1 = accept ( fdinfo - > fd , ( struct sockaddr * ) & addr , & in_len ) ;
2019-03-25 16:33:08 +00:00
if ( conn_sock1 = = - 1 & & errno = = EAGAIN ) return 1 ;
2019-03-13 16:53:46 +00:00
if ( conn_sock1 = = - 1 ) goto co_error ;
conn_sock2 = dup ( conn_sock1 ) ;
if ( conn_sock2 = = - 1 ) goto co_error ;
//printf("fd=%d accepts, creating fds=%d,%d\n", fd, conn_sock1, conn_sock2);
url_get_port ( port , fdinfo - > url ) ;
to_fdinfo . fd = conn_sock1 ;
to_fdinfo . cat - > name = " tcp-read " ;
sprintf ( to_fdinfo . url , " tcp:read:127.0.0.1:%s " , port ) ;
evt_core_add_fd ( ctx , & to_fdinfo ) ;
to_fdinfo . fd = conn_sock2 ;
to_fdinfo . cat - > name = " tcp-write " ;
sprintf ( to_fdinfo . url , " tcp:write:127.0.0.1:%s " , port ) ;
evt_core_add_fd ( ctx , & to_fdinfo ) ;
2019-03-25 16:33:08 +00:00
return 0 ;
2019-03-13 16:53:46 +00:00
co_error :
perror ( " Failed to handle new connection " ) ;
exit ( EXIT_FAILURE ) ;
}
2019-03-27 10:48:09 +00:00
void show_link_availability ( struct rr_ctx * rr ) {
printf ( " Links availability: my_links[ " ) ;
for ( int i = 0 ; i < 8 ; i + + ) {
if ( rr - > my_links & 1 < < i ) printf ( " U " ) ;
else printf ( " - " ) ;
}
printf ( " ], rem_links[ " ) ;
for ( int i = 0 ; i < 8 ; i + + ) {
if ( rr - > remote_links & 1 < < i ) printf ( " U " ) ;
else printf ( " - " ) ;
}
printf ( " ] \n " ) ;
}
2019-03-19 17:50:56 +00:00
int set_timeout ( struct evt_core_ctx * evts , uint64_t milli_sec , struct waited_pkt * wpkt ) {
2019-03-13 16:53:46 +00:00
struct timespec now ;
struct itimerspec timer_config ;
char url [ 1024 ] ;
struct evt_core_cat cat = { 0 } ;
struct evt_core_fdinfo fdinfo = { 0 } ;
fdinfo . cat = & cat ;
fdinfo . url = url ;
2019-03-19 19:48:07 +00:00
//printf("Will add a timeout of %ld ms\n", milli_sec);
2019-03-13 16:53:46 +00:00
if ( clock_gettime ( CLOCK_REALTIME , & now ) = = - 1 ) {
perror ( " clock_gettime " ) ;
exit ( EXIT_FAILURE ) ;
}
2019-03-19 17:50:56 +00:00
uint64_t ns = now . tv_nsec + ( milli_sec % 1000 ) * 1000000 ;
timer_config . it_value . tv_sec = now . tv_sec + milli_sec / 1000 + ns / 1000000000 ;
timer_config . it_value . tv_nsec = ns % 1000000000 ;
2019-03-13 16:53:46 +00:00
timer_config . it_interval . tv_sec = 60 ;
timer_config . it_interval . tv_nsec = 0 ;
fdinfo . fd = timerfd_create ( CLOCK_REALTIME , 0 ) ;
if ( fdinfo . fd = = - 1 ) {
perror ( " Unable to timerfd_create " ) ;
exit ( EXIT_FAILURE ) ;
}
if ( timerfd_settime ( fdinfo . fd , TFD_TIMER_ABSTIME , & timer_config , NULL ) = = - 1 ) {
2019-03-19 17:50:56 +00:00
perror ( " Unable to timerfd_settime " ) ;
2019-03-13 16:53:46 +00:00
exit ( EXIT_FAILURE ) ;
}
fdinfo . cat - > name = " timeout " ;
2019-03-19 14:39:05 +00:00
fdinfo . other = wpkt ; // Should put the link number and the id
2019-03-13 16:53:46 +00:00
fdinfo . free_other = NULL ;
2019-03-19 17:50:56 +00:00
sprintf ( fdinfo . url , " timer:%ld:1 " , milli_sec ) ;
2019-03-13 16:53:46 +00:00
evt_core_add_fd ( evts , & fdinfo ) ;
2019-03-19 14:54:31 +00:00
2019-03-19 14:39:05 +00:00
return fdinfo . fd ;
2019-03-13 16:53:46 +00:00
}
2019-03-19 14:39:05 +00:00
void rr_pkt_register ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct buffer_packet * bp ) {
2019-03-13 16:53:46 +00:00
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
2019-03-18 09:26:02 +00:00
struct rr_ctx * rr = app_ctx - > misc ;
2019-03-27 15:24:39 +00:00
char buffer [ 16 ] ;
url_get_port ( buffer , fdinfo - > url ) ;
int link_num = atoi ( buffer ) - 7500 ; // @FIXME Hardcoded
2019-03-27 16:46:10 +00:00
//printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id);
2019-03-13 16:53:46 +00:00
2019-03-27 15:24:39 +00:00
// 0. Update remote links
if ( ring_lt ( rr - > recv_id_late , bp - > ip . ap . str . id ) & & ! ( rr - > remote_links & 1 < < link_num ) ) {
2019-03-28 14:29:34 +00:00
printf ( " Activate link=%d | " , link_num ) ;
2019-03-27 15:24:39 +00:00
rr - > remote_links | = 1 < < link_num ; // Make sure that the link is marked as working
show_link_availability ( rr ) ;
}
2019-03-19 16:54:28 +00:00
2019-03-27 15:24:39 +00:00
// 1. Update my links I can use thanks to target feedback
2019-03-27 17:11:05 +00:00
if ( bp - > ip . ap . str . id > rr - > my_links_ver & & bp - > ip . ap . str . bitfield ! = rr - > my_links ) {
2019-03-18 16:58:40 +00:00
rr - > my_links = bp - > ip . ap . str . bitfield ;
rr - > my_links_ver = bp - > ip . ap . str . id ;
2019-03-28 14:29:34 +00:00
printf ( " Update my links | " ) ;
2019-03-27 17:11:05 +00:00
show_link_availability ( rr ) ;
2019-03-13 16:53:46 +00:00
}
2019-03-19 14:39:05 +00:00
// 2. If packet arrived too late, we discard it
2019-03-18 16:58:40 +00:00
if ( ring_gt ( rr - > recv_id , bp - > ip . ap . str . id - 1 ) ) {
2019-03-13 16:53:46 +00:00
// Packet has already been delivered or dropped, we free the buffer
2019-03-19 19:48:07 +00:00
fprintf ( stderr , " Packet %d arrived too late (current: %d) \n " , bp - > ip . ap . str . id , rr - > recv_id ) ;
2019-03-20 14:13:16 +00:00
mv_buffer_wtof ( app_ctx , fdinfo ) ;
2019-03-19 14:39:05 +00:00
return ;
2019-03-13 16:53:46 +00:00
}
2019-03-19 14:39:05 +00:00
// 3. If packet arrived too early, we register a timer
2019-03-20 09:28:52 +00:00
//printf("%d < %d = %d\n", rr->recv_id, bp->ip.ap.str.id - 1, ring_lt(rr->recv_id, bp->ip.ap.str.id - 1));
2019-03-18 16:58:40 +00:00
if ( ring_lt ( rr - > recv_id , bp - > ip . ap . str . id - 1 ) ) {
2019-03-18 09:26:02 +00:00
int64_t timeout = rr - > mjit - ( int64_t ) bp - > ip . ap . str . deltat ;
2019-03-19 19:48:07 +00:00
//printf("%ld - %ld = %ld\n", rr->mjit, (int64_t) bp->ip.ap.str.deltat, timeout);
2019-03-13 16:53:46 +00:00
if ( timeout < = 0 ) timeout = 0 ;
2019-03-19 14:39:05 +00:00
int idx_waited = ( bp - > ip . ap . str . id - 1 ) % PACKET_BUFFER_SIZE ;
rr - > wait [ idx_waited ] . on = 1 ;
2019-03-19 20:15:51 +00:00
rr - > wait [ idx_waited ] . id = bp - > ip . ap . str . id - 1 ;
2019-03-19 14:39:05 +00:00
rr - > wait [ idx_waited ] . link_num = bp - > ip . ap . str . prevlink ;
2019-03-19 14:54:31 +00:00
rr - > wait [ idx_waited ] . timer_fd = set_timeout ( ctx , timeout , & rr - > wait [ idx_waited ] ) ;
2019-03-15 15:44:47 +00:00
}
2019-03-27 15:24:39 +00:00
// 4. If packet has not already a timer or has a wrong timer
2019-03-19 14:39:05 +00:00
int idx_real = bp - > ip . ap . str . id % PACKET_BUFFER_SIZE ;
2019-03-27 15:24:39 +00:00
if ( rr - > wait [ idx_real ] . on & & rr - > wait [ idx_real ] . id ! = bp - > ip . ap . str . id ) {
fprintf ( stderr , " Waiting array is full, BUG \n " ) ;
2019-03-27 17:27:44 +00:00
exit ( EXIT_FAILURE ) ;
2019-03-27 15:24:39 +00:00
} else if ( ! rr - > wait [ idx_real ] . on ) {
rr - > wait [ idx_real ] . on = 1 ;
rr - > wait [ idx_real ] . id = bp - > ip . ap . str . id ;
rr - > wait [ idx_real ] . link_num = link_num ;
rr - > wait [ idx_real ] . timer_fd = set_timeout ( ctx , rr - > mjit + 1 , & rr - > wait [ idx_real ] ) ;
}
// 5. We queue the packet
2019-03-27 17:27:44 +00:00
if ( rr - > real [ idx_real ] . on & & rr - > real [ idx_real ] . id ! = bp - > ip . ap . str . id ) {
2019-03-27 17:46:48 +00:00
fprintf ( stderr , " Real array is full for id=%d, idx=%d, BUG: [ \n " , bp - > ip . ap . str . id , idx_real ) ;
for ( int i = 0 ; i < PACKET_BUFFER_SIZE ; i + + ) {
printf ( " \t %d => %d \n " , i , rr - > real [ i ] . on ) ;
}
printf ( " ] \n " ) ;
2019-03-27 17:27:44 +00:00
exit ( EXIT_FAILURE ) ;
} else if ( ! rr - > real [ idx_real ] . on ) {
rr - > real [ idx_real ] . on = 1 ;
2019-03-27 17:46:48 +00:00
rr - > real [ idx_real ] . id = bp - > ip . ap . str . id ;
2019-03-27 17:27:44 +00:00
rr - > real [ idx_real ] . idx = idx_real ;
rr - > real [ idx_real ] . link_fd = fdinfo - > fd ;
mv_buffer_rtoa ( app_ctx , fdinfo , & rr - > real [ idx_real ] . idx ) ;
2019-03-27 17:46:48 +00:00
//printf("%d is added to real as %d\n", bp->ip.ap.str.id, idx_real);
2019-03-27 17:27:44 +00:00
} else {
fprintf ( stdout , " Packet %d already received (current: %d) \n " , bp - > ip . ap . str . id , rr - > recv_id ) ;
mv_buffer_wtof ( app_ctx , fdinfo ) ;
}
2019-03-18 16:58:40 +00:00
}
2019-03-20 14:13:16 +00:00
void rr_deliver ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo , struct deferred_pkt * dp ) {
2019-03-18 16:58:40 +00:00
struct evt_core_fdinfo * to_fdinfo = NULL ;
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
2019-03-19 14:39:05 +00:00
struct rr_ctx * rr = app_ctx - > misc ;
2019-03-18 16:58:40 +00:00
char url [ 255 ] ;
2019-03-20 14:13:16 +00:00
// 1. Marked the packet as handled
dp - > on = 0 ;
2019-03-27 17:27:44 +00:00
// 2. Get the buffer and update rr state
2019-03-20 14:13:16 +00:00
struct buffer_packet * bp = get_app_buffer ( app_ctx , & dp - > idx ) ;
2019-03-27 17:27:44 +00:00
int idx_real = bp - > ip . ap . str . id % PACKET_BUFFER_SIZE ;
2019-03-27 17:29:19 +00:00
rr - > real [ idx_real ] . on = 0 ;
2019-03-27 17:46:48 +00:00
//printf("%d is removed from real as %d\n", bp->ip.ap.str.id, idx_real);
2019-03-27 16:46:10 +00:00
//printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id);
2019-03-19 16:54:28 +00:00
2019-03-20 14:13:16 +00:00
// 3. We update our cursor
2019-03-19 14:39:05 +00:00
rr - > recv_id = bp - > ip . ap . str . id ;
2019-03-27 16:46:10 +00:00
// 4. We free the buffer if it's a control packet and quit
2019-03-27 17:27:44 +00:00
/*if (bp->ip.ap.str.flags & PKT_CONTROL) {
2019-03-20 14:13:16 +00:00
mv_buffer_atof ( app_ctx , & dp - > idx ) ;
2019-03-19 14:39:05 +00:00
return ;
2019-03-27 17:27:44 +00:00
} */
2019-03-19 14:39:05 +00:00
2019-03-27 16:46:10 +00:00
// 5. Find its target
2019-03-18 16:58:40 +00:00
sprintf ( url , " udp:write:127.0.0.1:%d " , bp - > ip . ap . str . port ) ;
to_fdinfo = evt_core_get_from_url ( ctx , url ) ;
if ( to_fdinfo = = NULL ) {
fprintf ( stderr , " No fd for URL %s in udp:write for tcp-read. Dropping packet :( \n " , url ) ;
2019-03-20 14:13:16 +00:00
//mv_buffer_wtor (app_ctx, fdinfo, bp);
mv_buffer_atof ( app_ctx , & dp - > idx ) ;
2019-03-18 16:58:40 +00:00
}
2019-03-27 16:46:10 +00:00
// 6. We move the buffer and notify the target
2019-03-20 14:13:16 +00:00
//mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp);
mv_buffer_atow ( app_ctx , & dp - > idx , to_fdinfo ) ;
2019-03-18 16:58:40 +00:00
rr_on_udp_write ( ctx , to_fdinfo ) ;
}
2019-03-19 14:39:05 +00:00
void rr_pkt_unroll ( struct evt_core_ctx * ctx , struct algo_ctx * app_ctx ) {
2019-03-18 16:58:40 +00:00
struct rr_ctx * rr = app_ctx - > misc ;
2019-03-19 14:39:05 +00:00
struct evt_core_fdinfo * fdinfo = NULL ;
struct buffer_packet * bp = NULL ;
while ( 1 ) {
2019-03-27 16:46:10 +00:00
//printf("Trying to deliver %d\n", rr->recv_id+1);
2019-03-19 14:39:05 +00:00
struct deferred_pkt * def = & rr - > real [ ( rr - > recv_id + 1 ) % PACKET_BUFFER_SIZE ] ;
2019-03-20 14:13:16 +00:00
if ( ! def - > on ) break ;
2019-03-19 14:39:05 +00:00
fdinfo = evt_core_get_from_fd ( ctx , def - > link_fd ) ;
if ( fdinfo = = NULL ) {
fprintf ( stderr , " An error occured as the link seems to be closed for the requested fd \n " ) ;
rr - > recv_id + + ;
continue ;
}
2019-03-20 14:13:16 +00:00
rr_deliver ( ctx , fdinfo , def ) ;
2019-03-27 16:46:10 +00:00
//printf("Delivered %d\n", rr->recv_id);
2019-03-19 14:39:05 +00:00
}
2019-03-13 16:53:46 +00:00
}
2019-03-19 15:35:11 +00:00
//------
2019-03-13 16:53:46 +00:00
int rr_on_tcp_read ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
struct buffer_packet * bp ;
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
2019-03-19 14:39:05 +00:00
struct rr_ctx * rr = app_ctx - > misc ;
2019-03-13 16:53:46 +00:00
int read_res = FDS_READY ;
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ( ( bp = get_read_buffer ( app_ctx , fdinfo ) ) = = NULL ) return 1 ;
// 2. Try to read a whole packet in the buffer
while ( bp - > mode = = BP_READING ) {
read_res = read_packet_from_tcp ( fdinfo - > fd , bp ) ;
if ( read_res = = FDS_ERR ) goto co_error ;
if ( read_res = = FDS_AGAIN ) return 1 ;
}
// 3. Logic on packet
2019-03-19 14:39:05 +00:00
rr_pkt_register ( ctx , fdinfo , bp ) ;
rr_pkt_unroll ( ctx , app_ctx ) ;
2019-03-13 16:53:46 +00:00
return 0 ;
co_error :
perror ( " Failed to TCP read " ) ;
exit ( EXIT_FAILURE ) ;
}
2019-03-18 16:58:40 +00:00
int rr_on_udp_write ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
struct buffer_packet * bp ;
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
int write_res = FDS_READY ;
// 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ( ( bp = get_write_buffer ( app_ctx , fdinfo ) ) = = NULL ) return 1 ;
// 2. Write buffer
write_res = write_packet_to_udp ( fdinfo - > fd , bp , fdinfo - > other ) ;
if ( write_res = = FDS_ERR ) goto co_error ;
if ( write_res = = FDS_AGAIN ) return 1 ;
// 3. A whole packet has been written
// Release the buffer and notify
2019-03-20 14:13:16 +00:00
mv_buffer_wtof ( app_ctx , fdinfo ) ;
2019-03-18 16:58:40 +00:00
notify_read ( ctx , app_ctx ) ;
return 0 ;
co_error :
perror ( " Failed to UDP write " ) ;
exit ( EXIT_FAILURE ) ;
}
2019-03-15 15:44:47 +00:00
2019-03-19 15:35:11 +00:00
int rr_on_udp_read ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
struct buffer_packet * bp ;
2019-03-19 16:16:49 +00:00
struct evt_core_fdinfo * to_fdinfo = NULL ;
2019-03-19 15:35:11 +00:00
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct rr_ctx * rr = app_ctx - > misc ;
int read_res = FDS_READY ;
char url [ 255 ] ;
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ( ( bp = get_read_buffer ( app_ctx , fdinfo ) ) = = NULL ) return 1 ;
// 2. Read packet from socket
bp - > ip . ap . str . port = url_get_port_int ( fdinfo - > url ) ;
read_res = read_packet_from_udp ( fdinfo - > fd , bp , fdinfo - > other ) ;
if ( read_res = = FDS_ERR ) goto co_error ;
if ( read_res = = FDS_AGAIN ) return 1 ;
// 3. Prepare RR state and packet values
struct timespec curr ;
int secs , nsecs ;
2019-03-19 17:50:56 +00:00
uint64_t mili_sec ;
2019-03-19 15:35:11 +00:00
if ( clock_gettime ( CLOCK_MONOTONIC , & curr ) = = - 1 ) {
perror ( " clock_gettime error " ) ;
exit ( EXIT_FAILURE ) ;
}
secs = curr . tv_sec - rr - > emit_time . tv_sec ;
nsecs = curr . tv_nsec - rr - > emit_time . tv_nsec ;
2019-03-19 17:50:56 +00:00
mili_sec = secs * 1000 + nsecs / 1000000 ;
if ( mili_sec > rr - > mjit ) mili_sec = rr - > mjit ;
2019-03-19 15:35:11 +00:00
bp - > ip . ap . str . id = rr - > sent_id ;
bp - > ip . ap . str . flags = 0 ;
2019-03-19 17:50:56 +00:00
bp - > ip . ap . str . deltat = mili_sec ;
2019-03-19 15:35:11 +00:00
bp - > ip . ap . str . bitfield = rr - > remote_links ;
bp - > ip . ap . str . prevlink = rr - > current_link ;
2019-03-27 16:46:10 +00:00
//printf("Will send packet id=%d\n", bp->ip.ap.str.id);
2019-03-19 15:35:11 +00:00
rr - > emit_time = curr ;
rr - > sent_id + + ;
2019-03-27 15:24:39 +00:00
int max = 10 ;
uint8_t sel_link = rr - > current_link ;
while ( max - - > = 0 ) {
sel_link = ( sel_link + 1 ) % 8 ;
sprintf ( url , " tcp:write:127.0.0.1:%d " , 7500 + sel_link ) ; //@FIXME Hardcoded
to_fdinfo = evt_core_get_from_url ( ctx , url ) ;
if ( to_fdinfo = = NULL ) continue ; // Missing link
2019-03-28 14:58:18 +00:00
if ( app_ctx - > ap . is_waiting_bootstrap & & ! app_ctx - > is_rdy ) goto not_ready ; // Some links are down
if ( ! app_ctx - > ap . is_healing | | rr - > my_links & ( 1 < < sel_link ) ) {
2019-03-27 15:24:39 +00:00
rr - > current_link = sel_link ;
mv_buffer_rtow ( app_ctx , fdinfo , to_fdinfo ) ;
rr_on_tcp_write ( ctx , to_fdinfo ) ;
return 0 ;
} else {
dup_buffer_tow ( app_ctx , bp , to_fdinfo ) ;
rr_on_tcp_write ( ctx , to_fdinfo ) ;
}
2019-03-19 15:35:11 +00:00
}
2019-03-28 14:26:33 +00:00
not_ready :
2019-03-27 15:24:39 +00:00
// 4. A whole packet has been read, we will find someone to write it
2019-03-28 14:26:33 +00:00
fprintf ( stderr , " Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n " , fdinfo - > url ) ;
2019-03-27 15:24:39 +00:00
mv_buffer_wtof ( app_ctx , fdinfo ) ;
2019-03-19 15:35:11 +00:00
return 0 ;
co_error :
perror ( " Failed to UDP read " ) ;
exit ( EXIT_FAILURE ) ;
}
int rr_on_tcp_write ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
struct buffer_packet * bp ;
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
2019-03-28 14:26:33 +00:00
struct rr_ctx * rr = app_ctx - > misc ;
2019-03-19 15:35:11 +00:00
int write_res = FDS_READY ;
2019-03-28 14:26:33 +00:00
// 0. Show some information about circuits
2019-03-28 14:58:18 +00:00
uint8_t is_rdy = fdinfo - > cat - > socklist - > len > = app_ctx - > link_count ? 1 : 0 ;
if ( ! app_ctx - > is_rdy & & is_rdy ) printf ( " === Our %d requested circuits are now up === \n " , app_ctx - > link_count ) ;
else if ( app_ctx - > is_rdy & & ! is_rdy ) printf ( " === Only %d/%d circuits are available, results could be biased === \n " , fdinfo - > cat - > socklist - > len , app_ctx - > link_count ) ;
app_ctx - > is_rdy = is_rdy ;
2019-03-28 14:26:33 +00:00
2019-03-19 15:35:11 +00:00
// 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ( ( bp = get_write_buffer ( app_ctx , fdinfo ) ) = = NULL ) return 1 ;
// 2. Write data from the buffer to the socket
while ( bp - > mode = = BP_WRITING ) {
write_res = write_packet_to_tcp ( fdinfo - > fd , bp ) ;
if ( write_res = = FDS_ERR ) goto co_error ;
if ( write_res = = FDS_AGAIN ) return 1 ;
}
// 3. A whole packet has been written
// Release the buffer and notify
2019-03-20 14:13:16 +00:00
mv_buffer_wtof ( app_ctx , fdinfo ) ;
2019-03-19 15:35:11 +00:00
notify_read ( ctx , app_ctx ) ;
return 0 ;
co_error :
perror ( " Failed to TCP write " ) ;
exit ( EXIT_FAILURE ) ;
}
2019-03-19 12:50:38 +00:00
int rr_on_timer ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
2019-03-27 16:31:46 +00:00
uint64_t ctr ;
ssize_t tmr_rd ;
tmr_rd = read ( fdinfo - > fd , & ctr , sizeof ( ctr ) ) ;
if ( tmr_rd = = - 1 & & errno = = EAGAIN ) return 1 ;
if ( tmr_rd < 0 ) {
perror ( " read on timer " ) ;
fprintf ( stderr , " An error occured on timer fd=%d \n " , fdinfo - > fd ) ;
exit ( EXIT_FAILURE ) ;
}
2019-03-19 12:50:38 +00:00
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct rr_ctx * rr = app_ctx - > misc ;
2019-03-19 14:54:31 +00:00
struct waited_pkt * pkt = fdinfo - > other ;
2019-03-19 20:21:00 +00:00
pkt - > on = 0 ;
2019-03-27 15:24:39 +00:00
if ( ring_gt ( pkt - > id , rr - > recv_id_late ) ) rr - > recv_id_late = pkt - > id ;
2019-03-27 16:46:10 +00:00
if ( ring_le ( pkt - > id , rr - > recv_id ) ) goto end ;
2019-03-19 20:06:01 +00:00
printf ( " Timer reached for packet %d \n " , pkt - > id ) ;
2019-03-19 12:50:38 +00:00
2019-03-20 09:28:52 +00:00
// !BLACKLIST LINK
2019-03-28 14:29:34 +00:00
printf ( " Blacklist link=%d | " , pkt - > link_num ) ;
2019-03-27 10:19:05 +00:00
rr - > remote_links & = 0xff ^ 1 < < pkt - > link_num ;
2019-03-27 13:17:41 +00:00
show_link_availability ( rr ) ;
2019-03-20 09:28:52 +00:00
2019-03-20 08:08:32 +00:00
while ( ring_lt ( rr - > recv_id , pkt - > id ) ) {
rr - > recv_id + + ;
rr_pkt_unroll ( ctx , app_ctx ) ;
}
2019-03-19 14:39:05 +00:00
2019-03-27 16:46:10 +00:00
end :
2019-03-27 16:31:46 +00:00
evt_core_rm_fd ( ctx , fdinfo - > fd ) ;
2019-03-19 12:50:38 +00:00
return 1 ;
}
2019-03-19 09:00:03 +00:00
int rr_on_err ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct buffer_packet * bp ;
// 1. If has a "used" buffer, remove it
bp = g_hash_table_lookup ( app_ctx - > used_buffer , & ( fdinfo - > fd ) ) ;
if ( bp ! = NULL ) {
g_hash_table_remove ( app_ctx - > used_buffer , & ( fdinfo - > fd ) ) ;
memset ( bp , 0 , sizeof ( struct buffer_packet ) ) ;
g_queue_push_tail ( app_ctx - > free_buffer , bp ) ;
}
// 2. If appears in the write waiting queue, remove it
GQueue * writew = g_hash_table_lookup ( app_ctx - > write_waiting , & ( fdinfo - > fd ) ) ;
while ( writew ! = NULL & & ( bp = g_queue_pop_head ( writew ) ) ! = NULL ) {
memset ( bp , 0 , sizeof ( struct buffer_packet ) ) ;
g_queue_push_tail ( app_ctx - > free_buffer , bp ) ;
}
g_hash_table_remove ( app_ctx - > write_waiting , & ( fdinfo - > fd ) ) ;
// 3. If appears in the read waiting queue, remove it
g_queue_remove_all ( app_ctx - > read_waiting , & ( fdinfo - > fd ) ) ;
return 0 ;
}
2019-03-28 10:54:01 +00:00
void algo_rr ( struct evt_core_ctx * evt , struct algo_skel * as , struct algo_params * ap ) {
2019-03-07 15:57:02 +00:00
struct algo_ctx * ctx = malloc ( sizeof ( struct algo_ctx ) ) ;
if ( ctx = = NULL ) goto init_err ;
memset ( ctx , 0 , sizeof ( struct algo_ctx ) ) ;
ctx - > free_buffer = g_queue_new ( ) ;
ctx - > read_waiting = g_queue_new ( ) ;
2019-03-20 14:13:16 +00:00
ctx - > application_waiting = g_hash_table_new ( NULL , NULL ) ;
2019-03-07 15:57:02 +00:00
ctx - > used_buffer = g_hash_table_new ( g_int_hash , g_int_equal ) ;
ctx - > write_waiting = g_hash_table_new_full ( g_int_hash , g_int_equal , NULL , naive_free_simple ) ;
2019-03-28 14:58:18 +00:00
ctx - > link_count = 8 ;
ctx - > is_rdy = 0 ;
ctx - > ap = * ap ;
2019-03-18 09:26:02 +00:00
struct rr_ctx * rr = malloc ( sizeof ( struct rr_ctx ) ) ;
if ( rr = = NULL ) goto init_err ;
memset ( rr , 0 , sizeof ( struct rr_ctx ) ) ;
rr - > mjit = 200 ;
rr - > my_links = 0xff ;
rr - > remote_links = 0xff ;
2019-03-19 16:54:28 +00:00
rr - > sent_id = 1 ;
2019-03-27 15:24:39 +00:00
rr - > recv_id = 0 ;
rr - > recv_id_late = 0 ;
2019-03-18 09:26:02 +00:00
ctx - > misc = rr ;
2019-03-07 15:57:02 +00:00
for ( int i = 0 ; i < sizeof ( ctx - > bps ) / sizeof ( ctx - > bps [ 0 ] ) ; i + + ) {
g_queue_push_tail ( ctx - > free_buffer , & ( ctx - > bps [ i ] ) ) ;
}
2019-03-19 09:00:03 +00:00
2019-03-07 15:57:02 +00:00
as - > on_tcp_co . name = " tcp-listen " ;
as - > on_tcp_co . flags = EPOLLIN ;
as - > on_tcp_co . free_app_ctx = free_nothing ;
2019-03-19 09:00:03 +00:00
as - > on_tcp_co . cb = rr_on_tcp_co ;
2019-03-07 15:57:02 +00:00
as - > on_tcp_read . name = " tcp-read " ;
as - > on_tcp_read . flags = EPOLLIN | EPOLLET | EPOLLRDHUP ;
as - > on_tcp_read . app_ctx = ctx ;
as - > on_tcp_read . free_app_ctx = free_naive ;
2019-03-19 09:00:03 +00:00
as - > on_tcp_read . cb = rr_on_tcp_read ;
as - > on_tcp_read . err_cb = rr_on_err ;
2019-03-07 15:57:02 +00:00
ctx - > ref_count + + ;
2019-03-19 12:50:38 +00:00
2019-03-19 15:35:11 +00:00
2019-03-07 15:57:02 +00:00
as - > on_udp_read . name = " udp-read " ;
as - > on_udp_read . flags = EPOLLIN | EPOLLET ;
as - > on_udp_read . app_ctx = ctx ;
as - > on_udp_read . free_app_ctx = free_naive ;
2019-03-19 15:35:11 +00:00
as - > on_udp_read . cb = rr_on_udp_read ;
as - > on_udp_read . err_cb = rr_on_err ;
2019-03-07 15:57:02 +00:00
ctx - > ref_count + + ;
as - > on_tcp_write . name = " tcp-write " ;
as - > on_tcp_write . flags = EPOLLOUT | EPOLLET | EPOLLRDHUP ;
as - > on_tcp_write . app_ctx = ctx ;
as - > on_tcp_write . free_app_ctx = free_naive ;
2019-03-19 15:35:11 +00:00
as - > on_tcp_write . cb = rr_on_tcp_write ;
as - > on_tcp_write . err_cb = rr_on_err ;
2019-03-07 15:57:02 +00:00
ctx - > ref_count + + ;
as - > on_udp_write . name = " udp-write " ;
as - > on_udp_write . flags = EPOLLOUT | EPOLLET ;
as - > on_udp_write . app_ctx = ctx ;
as - > on_udp_write . free_app_ctx = free_naive ;
2019-03-19 09:00:03 +00:00
as - > on_udp_write . cb = rr_on_udp_write ;
as - > on_udp_write . err_cb = rr_on_err ;
2019-03-07 15:57:02 +00:00
ctx - > ref_count + + ;
2019-03-19 09:00:03 +00:00
struct evt_core_cat tcat = {
2019-03-19 12:50:38 +00:00
. name = " timeout " ,
. flags = EPOLLIN | EPOLLET ,
. app_ctx = ctx ,
. free_app_ctx = free_naive ,
. cb = rr_on_timer ,
. err_cb = NULL
2019-03-19 09:00:03 +00:00
} ;
2019-03-19 12:50:38 +00:00
ctx - > ref_count + + ;
2019-03-19 09:00:03 +00:00
evt_core_add_cat ( evt , & tcat ) ;
2019-03-07 15:57:02 +00:00
return ;
init_err :
fprintf ( stderr , " Failed to init algo naive \n " ) ;
exit ( EXIT_FAILURE ) ;
}