2019-04-24 14:23:41 +00:00
# include "proxy.h"
int main_on_tcp_co ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
int conn_sock1 , conn_sock2 ;
struct sockaddr_in addr ;
socklen_t in_len ;
char url [ 1024 ] , port [ 6 ] ;
struct evt_core_cat local_cat = { 0 } ;
struct evt_core_fdinfo to_fdinfo = { 0 } ;
to_fdinfo . cat = & local_cat ;
to_fdinfo . url = url ;
in_len = sizeof ( addr ) ;
conn_sock1 = accept ( fdinfo - > fd , ( struct sockaddr * ) & addr , & in_len ) ;
if ( conn_sock1 = = - 1 & & errno = = EAGAIN ) return 1 ;
if ( conn_sock1 = = - 1 ) goto co_error ;
conn_sock2 = dup ( conn_sock1 ) ;
if ( conn_sock2 = = - 1 ) goto co_error ;
//printf("fd=%d accepts, creating fds=%d,%d\n", fd, conn_sock1, conn_sock2);
url_get_port ( port , fdinfo - > url ) ;
to_fdinfo . fd = conn_sock1 ;
to_fdinfo . cat - > name = " tcp-read " ;
sprintf ( to_fdinfo . url , " tcp:read:127.0.0.1:%s " , port ) ;
evt_core_add_fd ( ctx , & to_fdinfo ) ;
to_fdinfo . fd = conn_sock2 ;
to_fdinfo . cat - > name = " tcp-write " ;
sprintf ( to_fdinfo . url , " tcp:write:127.0.0.1:%s " , port ) ;
evt_core_add_fd ( ctx , & to_fdinfo ) ;
return 0 ;
co_error :
perror ( " Failed to handle new connection " ) ;
exit ( EXIT_FAILURE ) ;
}
int main_on_tcp_read ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
struct buffer_packet * bp ;
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
int read_res = FDS_READY ;
2019-05-24 09:04:37 +00:00
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [proxy] Get current read buffer OR a new read buffer OR subscribe to be notified later \n " ) ;
2019-04-24 14:23:41 +00:00
if ( ( bp = get_read_buffer ( app_ctx , fdinfo ) ) = = NULL ) return 1 ;
2019-05-24 09:04:37 +00:00
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [proxy] Try to read a whole packet in the buffer \n " ) ;
2019-04-24 14:23:41 +00:00
while ( bp - > mode = = BP_READING ) {
read_res = read_packet_from_tcp ( fdinfo - > fd , bp ) ;
if ( read_res = = FDS_ERR ) goto co_error ;
if ( read_res = = FDS_AGAIN ) return 1 ;
}
2019-05-24 09:04:37 +00:00
if ( ctx - > verbose > 1 ) fprintf ( stderr , " [proxy] Call logic on packet \n " ) ;
2019-04-24 14:23:41 +00:00
return app_ctx - > desc - > on_stream ( ctx , fdinfo , bp ) ;
co_error :
perror ( " Failed to TCP read " ) ;
exit ( EXIT_FAILURE ) ;
}
int main_on_udp_read ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
struct buffer_packet * bp ;
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
int read_res = FDS_READY ;
char url [ 255 ] ;
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ( ( bp = get_read_buffer ( app_ctx , fdinfo ) ) = = NULL ) return 1 ;
// 2. Read packet from socket
2019-05-14 09:23:23 +00:00
bp - > ip . ap . fmt . content . clear . port = url_get_port_int ( fdinfo - > url ) ;
2019-04-24 14:23:41 +00:00
read_res = read_packet_from_udp ( fdinfo - > fd , bp , fdinfo - > other ) ;
if ( read_res = = FDS_ERR ) goto co_error ;
if ( read_res = = FDS_AGAIN ) return 1 ;
// 3. Apply logic
return app_ctx - > desc - > on_datagram ( ctx , fdinfo , bp ) ;
co_error :
perror ( " Failed to UDP read " ) ;
exit ( EXIT_FAILURE ) ;
}
int main_on_tcp_write ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
struct buffer_packet * bp ;
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct rr_ctx * rr = app_ctx - > misc ;
int write_res = FDS_READY ;
// 0. Show some information about circuits
uint8_t is_rdy = fdinfo - > cat - > socklist - > len > = app_ctx - > link_count ? 1 : 0 ;
if ( ! app_ctx - > is_rdy & & is_rdy ) printf ( " === Our %d requested circuits are now up === \n " , app_ctx - > link_count ) ;
else if ( app_ctx - > is_rdy & & ! is_rdy ) printf ( " === Only %d/%d circuits are available, results could be biased === \n " , fdinfo - > cat - > socklist - > len , app_ctx - > link_count ) ;
app_ctx - > is_rdy = app_ctx - > is_rdy | | is_rdy ; // @FIXME prevent deactivation for our tests
// 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ( ( bp = get_write_buffer ( app_ctx , fdinfo ) ) = = NULL ) return 1 ;
// 2. Write data from the buffer to the socket
while ( bp - > mode = = BP_WRITING ) {
write_res = write_packet_to_tcp ( fdinfo - > fd , bp ) ;
if ( write_res = = FDS_ERR ) goto co_error ;
if ( write_res = = FDS_AGAIN ) return 1 ;
}
// 3. A whole packet has been written
// Release the buffer and notify
mv_buffer_wtof ( app_ctx , fdinfo ) ;
notify_read ( ctx , app_ctx ) ;
return 0 ;
co_error :
perror ( " Failed to TCP write " ) ;
exit ( EXIT_FAILURE ) ;
}
int main_on_udp_write ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
struct buffer_packet * bp ;
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
int write_res = FDS_READY ;
// 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ( ( bp = get_write_buffer ( app_ctx , fdinfo ) ) = = NULL ) return 1 ;
// 2. Write buffer
write_res = write_packet_to_udp ( fdinfo - > fd , bp , fdinfo - > other ) ;
if ( write_res = = FDS_ERR ) goto co_error ;
if ( write_res = = FDS_AGAIN ) return 1 ;
// 3. A whole packet has been written
// Release the buffer and notify
mv_buffer_wtof ( app_ctx , fdinfo ) ;
notify_read ( ctx , app_ctx ) ;
return 0 ;
co_error :
perror ( " Failed to UDP write " ) ;
exit ( EXIT_FAILURE ) ;
}
int main_on_err ( struct evt_core_ctx * ctx , struct evt_core_fdinfo * fdinfo ) {
struct algo_ctx * app_ctx = fdinfo - > cat - > app_ctx ;
struct buffer_packet * bp ;
// 1. If has a "used" buffer, remove it
bp = g_hash_table_lookup ( app_ctx - > used_buffer , & ( fdinfo - > fd ) ) ;
if ( bp ! = NULL ) {
g_hash_table_remove ( app_ctx - > used_buffer , & ( fdinfo - > fd ) ) ;
memset ( bp , 0 , sizeof ( struct buffer_packet ) ) ;
g_queue_push_tail ( app_ctx - > free_buffer , bp ) ;
}
// 2. If appears in the write waiting queue, remove it
GQueue * writew = g_hash_table_lookup ( app_ctx - > write_waiting , & ( fdinfo - > fd ) ) ;
while ( writew ! = NULL & & ( bp = g_queue_pop_head ( writew ) ) ! = NULL ) {
memset ( bp , 0 , sizeof ( struct buffer_packet ) ) ;
g_queue_push_tail ( app_ctx - > free_buffer , bp ) ;
}
if ( writew ) g_hash_table_remove ( app_ctx - > write_waiting , & ( fdinfo - > fd ) ) ;
// 3. If appears in the read waiting queue, remove it
g_queue_remove_all ( app_ctx - > read_waiting , & ( fdinfo - > fd ) ) ;
return app_ctx - > desc - > on_err ( ctx , fdinfo ) ;
}
void algo_main_init ( struct evt_core_ctx * evt , struct algo_params * ap ) {
struct algo_ctx * ctx = malloc ( sizeof ( struct algo_ctx ) ) ;
if ( ctx = = NULL ) goto init_err ;
memset ( ctx , 0 , sizeof ( struct algo_ctx ) ) ;
ctx - > free_buffer = g_queue_new ( ) ;
ctx - > read_waiting = g_queue_new ( ) ;
ctx - > application_waiting = g_hash_table_new ( NULL , NULL ) ;
ctx - > used_buffer = g_hash_table_new ( g_int_hash , g_int_equal ) ;
ctx - > write_waiting = g_hash_table_new_full ( g_int_hash , g_int_equal , NULL , naive_free_simple ) ;
ctx - > link_count = 8 ;
ctx - > is_rdy = 0 ;
ctx - > ap = * ap ;
for ( int i = 0 ; i < sizeof ( ctx - > bps ) / sizeof ( ctx - > bps [ 0 ] ) ; i + + ) {
2019-05-24 09:25:31 +00:00
ctx - > bps [ i ] . mode = BP_READING ;
ctx - > bps [ i ] . aread = 0 ;
ctx - > bps [ i ] . ap_count = 0 ;
2019-04-24 14:23:41 +00:00
g_queue_push_tail ( ctx - > free_buffer , & ( ctx - > bps [ i ] ) ) ;
}
struct evt_core_cat tcp_listen = {
. name = " tcp-listen " ,
. flags = EPOLLIN ,
. app_ctx = ctx ,
2019-05-09 09:39:03 +00:00
. free_app_ctx = free_naive ,
2019-04-24 14:23:41 +00:00
. cb = main_on_tcp_co ,
. err_cb = NULL
} ;
2019-05-09 09:24:05 +00:00
ctx - > ref_count + + ;
2019-04-24 14:23:41 +00:00
evt_core_add_cat ( evt , & tcp_listen ) ;
struct evt_core_cat tcp_read = {
. name = " tcp-read " ,
. flags = EPOLLIN | EPOLLET | EPOLLRDHUP ,
. app_ctx = ctx ,
. free_app_ctx = free_naive ,
. cb = main_on_tcp_read ,
. err_cb = main_on_err
} ;
ctx - > ref_count + + ;
evt_core_add_cat ( evt , & tcp_read ) ;
struct evt_core_cat udp_read = {
. name = " udp-read " ,
. flags = EPOLLIN | EPOLLET | EPOLLRDHUP ,
. app_ctx = ctx ,
. free_app_ctx = free_naive ,
. cb = main_on_udp_read ,
. err_cb = main_on_err
} ;
ctx - > ref_count + + ;
evt_core_add_cat ( evt , & udp_read ) ;
struct evt_core_cat tcp_write = {
. name = " tcp-write " ,
. flags = EPOLLOUT | EPOLLET | EPOLLRDHUP ,
. app_ctx = ctx ,
. free_app_ctx = free_naive ,
. cb = main_on_tcp_write ,
. err_cb = main_on_err
} ;
ctx - > ref_count + + ;
evt_core_add_cat ( evt , & tcp_write ) ;
struct evt_core_cat udp_write = {
. name = " udp-write " ,
. flags = EPOLLOUT | EPOLLET ,
. app_ctx = ctx ,
. free_app_ctx = free_naive ,
. cb = main_on_udp_write ,
. err_cb = main_on_err
} ;
ctx - > ref_count + + ;
evt_core_add_cat ( evt , & udp_write ) ;
for ( int i = 0 ; i < sizeof ( available_algo ) / sizeof ( available_algo [ 0 ] ) ; i + + ) {
if ( strcmp ( available_algo [ i ] . name , ap - > algo_name ) = = 0 ) {
ctx - > desc = & ( available_algo [ i ] ) ;
ctx - > desc - > init ( evt , ctx , ap ) ;
return ;
}
}
fprintf ( stderr , " Algorithm %s has not been found \n " , ap - > algo_name ) ;
exit ( EXIT_FAILURE ) ;
init_err :
fprintf ( stderr , " Failed to init proxy \n " ) ;
exit ( EXIT_FAILURE ) ;
}