diff --git a/CMakeLists.txt b/CMakeLists.txt index 3bab5bd..59faa1e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,6 +31,8 @@ add_library(worker_lib STATIC src/worker.c) add_library(properties_lib STATIC src/properties.c) add_library(poll_array_lib STATIC src/data_structures/poll_array.c) add_library(marked_fds_lib STATIC src/data_structures/marked_fds.c) +add_library(session_lib STATIC src/data_structures/session.c) +add_library(current_session_lib STATIC src/data_structures/current_session.c) add_library(buffer_lib STATIC src/data_structures/buffer.c) add_library(http_request_lib STATIC src/http/request.c) add_library(http_response_lib STATIC src/http/response.c) @@ -44,7 +46,6 @@ add_library(ssl_utils_lib STATIC src/utils/ssl.c) add_library(db_migration_utils_lib STATIC src/utils/db_migration.c) add_library(signal_wrapper_lib STATIC src/shutdown/signal2.c) add_library(stop_lib STATIC src/shutdown/stop.c) -add_library(session_lib STATIC src/data_structures/session.c) # Handlers libraries @@ -63,18 +64,18 @@ add_library(simulator_http_lib STATIC test/simulator/http_cases.c) add_executable(main src/main.c ${HEADER_FILES}) -target_link_libraries(main PRIVATE listener_lib worker_lib builder_lib loader_lib parser_lib sender_lib http_request_lib http_response_lib socket_lib signal_wrapper_lib poll_array_lib db_migration_utils_lib file_utils_lib compression_utils_lib regex_utils_lib hash_utils_lib ssl_utils_lib assert_utils_lib logger_utils_lib session_lib marked_fds_lib stop_lib properties_lib static_lib buffer_lib OpenSSL::SSL OpenSSL::Crypto Threads::Threads ZLIB::ZLIB SQLite::SQLite3) +target_link_libraries(main PRIVATE listener_lib worker_lib builder_lib loader_lib parser_lib sender_lib http_request_lib http_response_lib socket_lib signal_wrapper_lib poll_array_lib db_migration_utils_lib file_utils_lib compression_utils_lib regex_utils_lib hash_utils_lib ssl_utils_lib session_lib assert_utils_lib logger_utils_lib current_session_lib marked_fds_lib stop_lib properties_lib static_lib buffer_lib OpenSSL::SSL OpenSSL::Crypto Threads::Threads ZLIB::ZLIB SQLite::SQLite3) enable_compile_options(main) #enable_address_sanitizer(main) add_executable(client test/simulator/client.c) -target_link_libraries(client PRIVATE simulator_cases_lib simulator_http_lib simulator_connection_lib socket_lib compression_utils_lib hash_utils_lib assert_utils_lib logger_utils_lib session_lib stop_lib marked_fds_lib properties_lib static_lib buffer_lib OpenSSL::SSL OpenSSL::Crypto Threads::Threads ZLIB::ZLIB CURL::libcurl SQLite::SQLite3) +target_link_libraries(client PRIVATE simulator_cases_lib simulator_http_lib simulator_connection_lib socket_lib compression_utils_lib hash_utils_lib session_lib assert_utils_lib logger_utils_lib stop_lib current_session_lib marked_fds_lib properties_lib static_lib buffer_lib OpenSSL::SSL OpenSSL::Crypto Threads::Threads ZLIB::ZLIB CURL::libcurl SQLite::SQLite3) enable_compile_options(client) # enable_address_sanitizer(client) add_executable(unit_test test/unit_tests/all_tests.c) -target_link_libraries(unit_test PRIVATE http_request_lib http_response_lib file_utils_lib regex_utils_lib poll_array_lib compression_utils_lib hash_utils_lib properties_lib assert_utils_lib logger_utils_lib session_lib stop_lib marked_fds_lib properties_lib static_lib buffer_lib OpenSSL::SSL OpenSSL::Crypto Threads::Threads ZLIB::ZLIB) +target_link_libraries(unit_test PRIVATE http_request_lib http_response_lib file_utils_lib regex_utils_lib poll_array_lib compression_utils_lib hash_utils_lib properties_lib session_lib assert_utils_lib logger_utils_lib stop_lib current_session_lib marked_fds_lib properties_lib static_lib buffer_lib OpenSSL::SSL OpenSSL::Crypto Threads::Threads ZLIB::ZLIB) enable_compile_options(unit_test) #enable_address_sanitizer(unit_test) # Add custom target to copy the static folder diff --git a/README.md b/README.md index a964c59..76397fb 100644 --- a/README.md +++ b/README.md @@ -122,11 +122,9 @@ enable_https = 0 (binary value for enabling HTTPS) * Performance optimization (spacial locality?) * A thread/path that listens to console/http input and can send commands to the server (e.g. reload config, shutdown, etc.) * Basic Auth -* One thread per request (Apache) -* One list per state * ENSURE_CAPACITY use assert for checking for memory issues... * Request/response logger -* Move session arrays to one struct +* HTTP/JSON parser ## Try to remember * Clean up includes diff --git a/src/data_structures/current_session.c b/src/data_structures/current_session.c new file mode 100644 index 0000000..7e9cf47 --- /dev/null +++ b/src/data_structures/current_session.c @@ -0,0 +1,7 @@ +#include "current_session.h" + +static __thread int thread_session_id = -1; + +void put_current_session(int session_id) { thread_session_id = session_id; } +int get_current_session() { return thread_session_id; } +void clear_current_session() { thread_session_id = -1; } diff --git a/src/data_structures/current_session.h b/src/data_structures/current_session.h new file mode 100644 index 0000000..d4d108b --- /dev/null +++ b/src/data_structures/current_session.h @@ -0,0 +1,8 @@ +#ifndef CURRENT_SESSION_H +#define CURRENT_SESSION_H + +void put_current_session(int session_id); +int get_current_session(); +void clear_current_session(); + +#endif // CURRENT_SESSION_H diff --git a/src/data_structures/map_impl.c b/src/data_structures/map_impl.c new file mode 100644 index 0000000..1a8b6f0 --- /dev/null +++ b/src/data_structures/map_impl.c @@ -0,0 +1,123 @@ +#include "session_by_thread_map.h" +#include +#include +#include +#include +#include + +#define TABLE_MAX_LOAD 0.75 + +typedef struct { + unsigned long thread_id; + int session_id; +} entry_t; + +typedef struct { + unsigned int count; + unsigned int capacity; + entry_t *entries; +} table_t; + +static table_t table = { + .count = 0, + .capacity = 0, + .entries = NULL, +}; +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +static entry_t *find_entry(entry_t *entries, unsigned long thread_id); +static int grow_capacity(); + +void destroy_session_by_thread_map() { + pthread_mutex_lock(&mutex); + if (table.entries != NULL) { + free(table.entries); + table.entries = NULL; + } + table.capacity = 0; + table.count = 0; + pthread_mutex_unlock(&mutex); +} + +int put_session_id_by_thread(int session_id) { + pthread_mutex_lock(&mutex); + + if (table.count + 1 > table.capacity * TABLE_MAX_LOAD) { + int result = grow_capacity(); + if (result < 0) { + pthread_mutex_unlock(&mutex); + return result; + } + } + unsigned long thread_id = pthread_self(); + entry_t *entry = find_entry(table.entries, thread_id); + bool is_new_entry = entry->session_id == -1; + if (session_id == -1) + table.count -= 1; + else if (is_new_entry) + table.count += 1; + + entry->session_id = session_id; + entry->thread_id = thread_id; + pthread_mutex_unlock(&mutex); + return 1; +} + +int get_session_id_by_thread() { + pthread_mutex_lock(&mutex); + int result = -1; + + if (table.count > 0) { + result = find_entry(table.entries, pthread_self())->session_id; + } + + pthread_mutex_unlock(&mutex); + return result; +} + +static entry_t *find_entry(entry_t *entries, unsigned long thread_id) { + unsigned int index = thread_id % table.capacity; + + while (1) { + entry_t *entry = &table.entries[index]; + if (entry->thread_id == thread_id || entry->session_id == -1) { + return entry; + } + + index = (index + 1) % table.capacity; + } +} + +static int grow_capacity() { + unsigned int new_capacity; + if (table.capacity == 0) { + new_capacity = 16; + } else { + new_capacity *= 2; + } + + entry_t *new_entries = malloc(sizeof(entry_t) * new_capacity); + if (new_entries == NULL) { + return -1; + } + + for (int i = 0; i < new_capacity; i++) { + new_entries[i].session_id = -1; + new_entries[i].thread_id = -1; + } + + for (int i = 0; i < table.capacity; i++) { + entry_t *entry = &table.entries[i]; + if (entry->session_id == -1) + continue; + + entry_t *dest = find_entry(new_entries, entry->thread_id); + dest->thread_id = entry->thread_id; + dest->session_id = entry->session_id; + } + + free(table.entries); + table.entries = new_entries; + table.capacity = new_capacity; + return table.capacity; +} diff --git a/src/data_structures/session.c b/src/data_structures/session.c index 952c83d..e49dca4 100644 --- a/src/data_structures/session.c +++ b/src/data_structures/session.c @@ -1,20 +1,23 @@ #include "session.h" #include "../properties.h" #include "../shutdown/stop.h" +#include "../utils/assert2.h" #include "buffer.h" #include "marked_fds.h" -#include #include -#include +#include -static struct session **session_array = NULL; +static session_node_t *session_node_array = NULL; // Eight states (8 bits) -static char **status_array = NULL; -static buffer **buffer_array = NULL; -static http_request_t **request_array = NULL; -static http_response_t **response_array = NULL; -static SSL **ssl_array = NULL; -static BIO **bio_array = NULL; +// static char **status_array = NULL; + +// Each state is represented by a linked list +static session_node_t *ll_ready_to_read = NULL; +static session_node_t *ll_has_been_read = NULL; +static session_node_t *ll_parsed = NULL; +static session_node_t *ll_data_fetched = NULL; +static session_node_t *ll_ready_to_send = NULL; + static int max_size = 0; static unsigned int session_count = 0; static unsigned int session_last_index = 0; @@ -24,109 +27,43 @@ static pthread_cond_t parsed_cond = PTHREAD_COND_INITIALIZER; static pthread_cond_t data_fetched_cond = PTHREAD_COND_INITIALIZER; static pthread_cond_t response_generated_cond = PTHREAD_COND_INITIALIZER; -static void assert_(const char *file, int line, const char *func, - const char *msg) { - fprintf(stdout, "Assertion failed: %s:%d: %s: %s\n", file, line, func, msg); - - void *backtrace_buffer[BACKTRACE_SIZE]; - unsigned int backtrace_size = backtrace(backtrace_buffer, BACKTRACE_SIZE); - char **backtrace_symbols_buffer = - backtrace_symbols(backtrace_buffer, backtrace_size); - if (backtrace_symbols_buffer != NULL && backtrace_size > 0) { - printf("Backtrace:\n"); - for (unsigned int i = 0; i < backtrace_size; i++) { - printf(" %d: %s\n", i, backtrace_symbols_buffer[i]); - } - free(backtrace_symbols_buffer); - } - - raise(SIGABRT); -} -#define assert(expr) \ - ((void)((expr) || (assert_(__FILE__, __LINE__, __func__, #expr), 0))) +static inline int init_session(int related_fd); +static inline void deinit_session(int i); +static inline int init_ssl_session(int index); +static inline void reset_session_at_index(int index); +static inline void reset_request_at_index(int index); +static void append_linked_list(session_node_t **ll, session_node_t *node); +static session_node_t *pop_linked_list(session_node_t **ll); +static session_node_t *splice_by_fd_linked_list(session_node_t **ll, int fd); int init_session_cache(SSL_CTX *ctx) { SSL_library_init(); - assert(session_array == NULL); + assert(session_node_array == NULL); max_size = get_session_max_size(); - // Static array of sessions, should not be resized... Tiger style - - // Session array - session_array = calloc(max_size, sizeof(struct session *)); - assert(session_array != NULL); - for (int i = 0; i < max_size; i++) { - session_array[i] = calloc(1, sizeof(struct session)); - assert(session_array[i] != NULL); - } - - // Status array - status_array = calloc(max_size, sizeof(char *)); - assert(status_array != NULL); - for (int i = 0; i < max_size; i++) { - status_array[i] = calloc(1, sizeof(char)); - assert(status_array[i] != NULL); - } - - // Buffer array - buffer_array = calloc(max_size, sizeof(*buffer_array)); - assert(buffer_array != NULL); - - for (int i = 0; i < max_size; i++) { - buffer_array[i] = init_buffer(0); - assert(buffer_array[i] != NULL); - } - - // Request array - request_array = calloc(max_size, sizeof(http_request_t *)); - assert(request_array != NULL); - - for (int i = 0; i < max_size; i++) { - request_array[i] = calloc(1, sizeof(http_request_t)); - assert(request_array[i] != NULL); - } - - // Response array - response_array = calloc(max_size, sizeof(http_response_t *)); - assert(response_array != NULL); - - for (int i = 0; i < max_size; i++) { - response_array[i] = calloc(1, sizeof(http_response_t)); - assert(response_array[i] != NULL); - } - - // BIO array - bio_array = malloc(max_size * sizeof(BIO *)); - assert(bio_array != NULL); - - for (int i = 0; i < max_size; i++) { - bio_array[i] = BIO_new(BIO_s_socket()); - BIO_set_nbio(bio_array[i], 1); - assert(bio_array[i] != NULL); - } - - // SSL array - if (ctx != NULL) { - ssl_array = malloc(max_size * sizeof(SSL *)); - assert(ssl_array != NULL); - - for (int i = 0; i < max_size; i++) { - ssl_array[i] = SSL_new(ctx); - assert(ssl_array[i] != NULL); - assert(SSL_is_server(ssl_array[i])); + session_node_array = calloc(get_session_max_size(), sizeof(session_node_t)); + for (int i = 0; i < get_session_max_size(); i++) { + session_node_t *node = &session_node_array[i]; + node->next = NULL; + session_t *session = &node->session; + session->failed_send_attempts = 0; + session->buffer = init_buffer(0); + assert(session->buffer != NULL); + session->bio = BIO_new(BIO_s_socket()); + assert(session->bio != NULL); + if (ctx != NULL) { + session->ssl = SSL_new(ctx); + assert(session->ssl != NULL); + assert(SSL_is_server(session->ssl)); } + // meta_t, http_request_t, http_response_t should be zeroed } return 0; } void destroy_session_cache() { - assert(session_array != NULL); - assert(status_array != NULL); - assert(buffer_array != NULL); - assert(request_array != NULL); - assert(response_array != NULL); - assert(bio_array != NULL); + assert(session_node_array != NULL); assert(pthread_mutex_destroy(&session_mutex) == 0); assert(pthread_cond_destroy(&request_read_cond) == 0); @@ -137,83 +74,39 @@ void destroy_session_cache() { session_last_index = 0; // Session array - for (int i = 0; i < max_size; i++) { - if (session_array[i] != NULL) { - free(session_array[i]); - session_array[i] = NULL; + for (int i = 0; i < get_session_max_size(); i++) { + session_node_t *node = &session_node_array[i]; + session_t *session = &node->session; + if (session->buffer != NULL) { + deinit_buffer(session->buffer); + session->buffer = NULL; } - } - free(session_array); - session_array = NULL; - - // Status array - for (int i = 0; i < max_size; i++) { - if (status_array[i] != NULL) { - free(status_array[i]); - status_array[i] = NULL; - } - } - free(status_array); - status_array = NULL; + if (session->ssl != NULL) { - // Buffer array - for (int i = 0; i < max_size; i++) { - if (buffer_array[i] != NULL) { - deinit_buffer(buffer_array[i]); - } - } - free(buffer_array); - buffer_array = NULL; - - // Request array - for (int i = 0; i < max_size; i++) { - if (request_array[i] != NULL) { - free(request_array[i]); - request_array[i] = NULL; - } - } - free(request_array); - request_array = NULL; - - // Response array - for (int i = 0; i < max_size; i++) { - if (response_array[i] != NULL) { - free(response_array[i]); - response_array[i] = NULL; - } - } - free(response_array); - response_array = NULL; - - // BIO array and SSL array - for (int i = 0; i < max_size; i++) { - if (ssl_array != NULL) { - if (SSL_get_rbio(ssl_array[i]) == NULL && - SSL_get_wbio(ssl_array[i]) == NULL) { - BIO_free(bio_array[i]); + if (SSL_get_rbio(session->ssl) == NULL && + SSL_get_wbio(session->ssl) == NULL) { + BIO_free(session->bio); } - SSL_free(ssl_array[i]); - ssl_array[i] = NULL; + SSL_free(session->ssl); + session->ssl = NULL; } else { - BIO_free(bio_array[i]); + BIO_free(session->bio); } - bio_array[i] = NULL; + session->bio = NULL; } - if (ssl_array != NULL) { - free(ssl_array); - ssl_array = NULL; - } - free(bio_array); - bio_array = NULL; + free(session_node_array); + session_node_array = NULL; + + // Status array + ll_ready_to_read = NULL; + ll_has_been_read = NULL; + ll_parsed = NULL; + ll_data_fetched = NULL; + ll_ready_to_send = NULL; } void broadcast_session() { - assert(session_array != NULL); - assert(status_array != NULL); - assert(buffer_array != NULL); - assert(request_array != NULL); - assert(response_array != NULL); - assert(bio_array != NULL); + assert(session_node_array != NULL); pthread_mutex_lock(&session_mutex); pthread_cond_broadcast(&request_read_cond); @@ -223,227 +116,59 @@ void broadcast_session() { pthread_mutex_unlock(&session_mutex); } -static inline void reset_session_at_index(int index) { - // Session array - memset(session_array[index], 0, sizeof(struct session)); - - // Status array - *status_array[index] = 0; - - // Buffer array - buffer *buffer = buffer_array[index]; - if (buffer->data != NULL) { - memset(buffer->data, 0, buffer->capacity); - } - buffer->count = 0; - - // Request array - memset(request_array[index], 0, sizeof(http_request_t)); - - // Response array - memset(response_array[index], 0, sizeof(http_response_t)); - - // BIO array - int ret = BIO_reset(bio_array[index]); - if (ret == -1) { - printf("BIO_reset failed: %s\n", ERR_error_string(ERR_get_error(), NULL)); - assert(ret != -1); - } - BIO_set_fd(bio_array[index], -1, BIO_NOCLOSE); - BIO_set_nbio(bio_array[index], 1); - - if (ssl_array != NULL) { - SSL *ssl = ssl_array[index]; - if (ssl != NULL) { - SSL_set_bio(ssl, bio_array[index], bio_array[index]); - ret = SSL_clear(ssl); - if (ret != 1) { - printf("SSL_clear failed: %s\n", - ERR_error_string(ERR_get_error(), NULL)); - assert(ret == 1); - } - } - } -} - -static inline void reset_request_at_index(int index) { - char is_ssl = request_array[index]->is_ssl; - - // Buffer array - buffer *buffer = buffer_array[index]; - if (buffer->data != NULL) { - free(buffer->data); - buffer->data = NULL; - } - buffer->capacity = 0; - buffer->count = 0; - - // Request array - memset(request_array[index], 0, sizeof(http_request_t)); - request_array[index]->is_ssl = is_ssl; -} - -// Get the session for a given thread id -int get_session_id_for_thread() { - int session_id = -1; - if (session_array != NULL) { - pthread_mutex_lock(&session_mutex); - unsigned long thread_id = (unsigned long)pthread_self(); - for (int i = 0; i < session_count; i++) { - if (session_array[i]->thread_id == thread_id) { - session_id = session_array[i]->id; - break; - } - } - pthread_mutex_unlock(&session_mutex); - } - return session_id; -} - -static inline int init_session_connection(int index) { - assert(session_array != NULL); - assert(bio_array != NULL); - assert(ssl_array != NULL); - - SSL *ssl = ssl_array[index]; - BIO *bio = bio_array[index]; - - SSL_set_bio(ssl, bio, bio); - - int ret = SSL_accept(ssl); - if (ret <= 0 || ret == 2) { - int err = SSL_get_error(ssl, ret); - // According to the SSL_accept, non-blocking socket must be handled - if (err == SSL_ERROR_WANT_READ) { - printf("SSL_ERROR_WANT_READ"); - return 1; - } else if (ERR_GET_REASON(ERR_peek_error()) == SSL_R_HTTP_REQUEST) { - printf("SSL_R_HTTP_REQUEST\n"); - return -1; - } else { - printf("SSL_accept failed, %s\n", - ERR_error_string(ERR_get_error(), NULL)); - return -1; - } - } - return ret; -} - -static inline int add_to_session(int related_fd) { - int next = session_count < max_size ? session_count : session_last_index; - assert(next < max_size); - - reset_session_at_index(next); - - session_array[next]->id = rand(); - session_array[next]->related_fd = related_fd; - session_array[next]->thread_id = 0; - *status_array[next] &= WORK_STATUS_INITIAL; - BIO_set_fd(bio_array[next], related_fd, BIO_NOCLOSE); - BIO_set_nbio(bio_array[next], 1); - - if (session_count < max_size) { - session_count++; - } else { - session_last_index = (session_last_index + 1) % max_size; - } - - return next; -} - -static inline void del_from_session(int i) { - struct session *session = session_array[i]; - char *status = status_array[i]; - buffer *buffer = buffer_array[i]; - http_request_t *request = request_array[i]; - http_response_t *response = response_array[i]; - BIO *bio = bio_array[i]; - SSL *ssl = (ssl_array != NULL) ? ssl_array[i] : NULL; - assert(ssl == NULL || ssl_array != NULL); - - reset_session_at_index(i); - - for (; i < session_count - 1; i++) { - // Shift all existing sessions to the left - session_array[i] = session_array[i + 1]; - status_array[i] = status_array[i + 1]; - buffer_array[i] = buffer_array[i + 1]; - request_array[i] = request_array[i + 1]; - response_array[i] = response_array[i + 1]; - bio_array[i] = bio_array[i + 1]; - if (ssl_array != NULL) - ssl_array[i] = ssl_array[i + 1]; - - // Replace the left shifted session with the to-be-deleted session - session_array[i + 1] = session; - status_array[i + 1] = status; - buffer_array[i + 1] = buffer; - request_array[i + 1] = request; - response_array[i + 1] = response; - if (ssl_array != NULL) - ssl_array[i + 1] = ssl; - bio_array[i + 1] = bio; - } - - if (session_count > 0) - session_count--; - - if (i < session_last_index) - session_last_index--; -} +void push_session(int related_fd, WORK_STATUS status) { + pthread_mutex_lock(&session_mutex); -void push_request(int related_fd, WORK_STATUS status) { - assert(session_array != NULL); - assert(status_array != NULL); - assert(buffer_array != NULL); - assert(request_array != NULL); - assert(response_array != NULL); - assert(bio_array != NULL); + assert(session_node_array != NULL); assert(related_fd > 0); assert(related_fd < 16384); - pthread_mutex_lock(&session_mutex); int found = 0; + bool is_new_session = + status == WORK_STATUS_INITIAL_SSL || status == WORK_STATUS_INITIAL; for (int i = 0; i < session_count; i++) { - if (session_array[i]->related_fd == related_fd) { + if (is_new_session) + break; + + if (session_node_array[i].session.meta.related_fd == related_fd) { found = 1; - assert(*status_array[i] & WORK_STATUS_PROCESSING); - *status_array[i] &= ~WORK_STATUS_PROCESSING; - *status_array[i] |= status; + session_node_t *node = &session_node_array[i]; switch (status) { - case WORK_STATUS_INITIAL_SSL: - *status_array[i] |= WORK_STATUS_INITIAL; - break; - case WORK_STATUS_INITIAL: - break; case WORK_STATUS_REQUEST_READ: + // TODO: Broadcast? + // TODO: Remove old sessions + append_linked_list(&ll_has_been_read, node); pthread_cond_broadcast(&request_read_cond); break; case WORK_STATUS_PARSED: + append_linked_list(&ll_parsed, node); pthread_cond_broadcast(&parsed_cond); break; case WORK_STATUS_DATA_FETCHED: + append_linked_list(&ll_data_fetched, node); pthread_cond_broadcast(&data_fetched_cond); break; case WORK_STATUS_READY_TO_SEND: + append_linked_list(&ll_ready_to_send, node); pthread_cond_broadcast(&response_generated_cond); break; case WORK_STATUS_SEND_FAILED: - *status_array[i] &= ~WORK_STATUS_READY_TO_SEND; + node->session.failed_send_attempts += 1; + if (node->session.failed_send_attempts >= 3) { + deinit_session(i); + } else { + append_linked_list(&ll_ready_to_send, node); + pthread_cond_broadcast(&response_generated_cond); + } break; case WORK_STATUS_SENT: - *status_array[i] &= - ~(WORK_STATUS_REQUEST_READ | WORK_STATUS_PARSED | - WORK_STATUS_DATA_FETCHED | WORK_STATUS_READY_TO_SEND); + node->session.failed_send_attempts = 0; reset_request_at_index(i); - break; - case WORK_STATUS_PROCESSING: - // Deadlocked - used for instance for failed send + append_linked_list(&ll_ready_to_read, node); break; case WORK_STATUS_REJECTED: - del_from_session(i); - mark(related_fd); + deinit_session(i); break; default: @@ -453,61 +178,51 @@ void push_request(int related_fd, WORK_STATUS status) { } } - if (found != 1 && - (status == WORK_STATUS_INITIAL || status == WORK_STATUS_INITIAL_SSL)) { - int index = add_to_session(related_fd); + if (found != 1 && is_new_session) { + int index = init_session(related_fd); + session_node_t *node = &session_node_array[index]; if (status == WORK_STATUS_INITIAL_SSL) { - switch (init_session_connection(index)) { + switch (init_ssl_session(index)) { case 1: - request_array[index]->is_ssl = 1; + node->session.request.is_ssl = 1; break; case -1: - del_from_session(index); - mark(related_fd); + deinit_session(index); break; default: assert(0); break; } } + append_linked_list(&ll_ready_to_read, node); } + pthread_mutex_unlock(&session_mutex); } -static inline int peek_next(WORK_STATUS status) { - int index = -1; - for (int i = 0; i < session_count; i++) { - if (*status_array[i] < status) - continue; - if (*status_array[i] & WORK_STATUS_PROCESSING) - continue; - if (*status_array[i] & WORK_STATUS_SEND_FAILED) - continue; - for (int bit = 4; bit >= 0; bit--) { - // All significant bits should NOT be set - if ((1 << bit) > status && (1 << bit) & *status_array[i]) { - break; - } - // This is the bit we are looking for - else if ((1 << bit) == status && status & *status_array[i]) { - index = i; - break; - } // TODO: Check if remaining bits are set - } +static session_node_t *pop_next(WORK_STATUS status) { + switch (status) { + case WORK_STATUS_REQUEST_READ: + return pop_linked_list(&ll_has_been_read); + case WORK_STATUS_PARSED: + return pop_linked_list(&ll_parsed); + case WORK_STATUS_DATA_FETCHED: + return pop_linked_list(&ll_data_fetched); + case WORK_STATUS_READY_TO_SEND: + return pop_linked_list(&ll_ready_to_send); + default: + assert(0); + return NULL; + break; } - return index; } -struct session_full_return pop_request(WORK_STATUS status) { - assert(session_array != NULL); - assert(buffer_array != NULL); - assert(request_array != NULL); - assert(response_array != NULL); - assert(bio_array != NULL); - struct session_full_return result = {NULL, NULL, NULL, NULL, NULL, NULL}; +session_t *pop_session(WORK_STATUS status) { pthread_mutex_lock(&session_mutex); - int index = -1; - while ((index = peek_next(status)) == -1 && !stop()) { + + assert(session_node_array != NULL); + session_node_t *result = NULL; + while ((result = pop_next(status)) == NULL && !is_shutdown_requested()) { switch (status) { case WORK_STATUS_REQUEST_READ: pthread_cond_wait(&request_read_cond, &session_mutex); @@ -527,61 +242,207 @@ struct session_full_return pop_request(WORK_STATUS status) { break; } } - if (index > -1) { - *status_array[index] |= WORK_STATUS_PROCESSING; - - result.session = session_array[index]; - result.buffer = buffer_array[index]; - result.request = request_array[index]; - result.response = response_array[index]; - result.bio = bio_array[index]; - if (ssl_array != NULL) { - result.ssl = ssl_array[index]; - } + + if (is_shutdown_requested()) { + printf("I am closing now!"); } pthread_mutex_unlock(&session_mutex); - return result; + return (result == NULL) ? NULL : &result->session; } -struct session_full_return pop_request_by_fd(int related_fd) { - assert(session_array != NULL); - assert(status_array != NULL); - assert(buffer_array != NULL); - assert(request_array != NULL); - assert(response_array != NULL); - assert(bio_array != NULL); +session_t *pop_session_for_read(int related_fd) { + pthread_mutex_lock(&session_mutex); + + assert(session_node_array != NULL); + assert(ll_ready_to_read != NULL); assert(related_fd > 0); assert(related_fd < 16384); - pthread_mutex_lock(&session_mutex); - struct session_full_return result = {NULL, NULL, NULL, NULL, NULL, NULL}; - int process = 0; - for (int i = 0; i < session_count; i++) { - if (session_array[i]->related_fd == related_fd) { - - if (((*status_array[i] & WORK_STATUS_SEND_FAILED) == 0) && - (*status_array[i] & WORK_STATUS_REQUEST_READ || - *status_array[i] & WORK_STATUS_PARSED || - *status_array[i] & WORK_STATUS_DATA_FETCHED || - *status_array[i] & WORK_STATUS_PROCESSING || - *status_array[i] & WORK_STATUS_READY_TO_SEND)) { - process = 1; + session_t *result = NULL; + session_node_t *node = + splice_by_fd_linked_list(&ll_ready_to_read, related_fd); + + if (node != NULL) { + result = &node->session; + } + + pthread_mutex_unlock(&session_mutex); + return result; +} + +static inline int init_session(int related_fd) { + int next = session_count < max_size ? session_count : session_last_index; + assert(next < max_size); + + reset_session_at_index(next); + + session_t *session = &session_node_array[next].session; + + session->meta.id = rand(); + session->meta.related_fd = related_fd; + BIO_set_fd(session->bio, related_fd, BIO_NOCLOSE); + BIO_set_nbio(session->bio, 1); + + if (session_count < max_size) { + session_count++; + } else { + session_last_index = (session_last_index + 1) % max_size; + } + + return next; +} + +static inline void deinit_session(int i) { + session_node_t node = session_node_array[i]; + mark(node.session.meta.related_fd); + + reset_session_at_index(i); + + for (; i < session_count - 1; i++) { + // Shift all existing sessions to the left + session_node_array[i] = session_node_array[i + 1]; + + // Replace the left shifted session with the to-be-deleted session + session_node_array[i + 1] = node; + } + + if (session_count > 0) + session_count--; + + if (i < session_last_index) + session_last_index--; +} + +static inline int init_ssl_session(int index) { + assert(session_node_array != NULL); + + SSL *ssl = session_node_array[index].session.ssl; + BIO *bio = session_node_array[index].session.bio; + + SSL_set_bio(ssl, bio, bio); + + int ret = SSL_accept(ssl); + if (ret <= 0 || ret == 2) { + int err = SSL_get_error(ssl, ret); + // According to the SSL_accept, non-blocking socket must be handled + if (err == SSL_ERROR_WANT_READ) { + printf("SSL_ERROR_WANT_READ"); + return 1; + } else if (ERR_GET_REASON(ERR_peek_error()) == SSL_R_HTTP_REQUEST) { + printf("SSL_R_HTTP_REQUEST\n"); + return -1; + } else { + printf("SSL_accept failed, %s\n", + ERR_error_string(ERR_get_error(), NULL)); + return -1; + } + } + return ret; +} + +static inline void reset_session_at_index(int index) { + session_t *session = &session_node_array[index].session; + + session->failed_send_attempts = 0; + + // Meta + memset(&session->meta, 0, sizeof(meta_t)); + + reset_request_at_index(index); + + // Response + memset(&session->response, 0, sizeof(http_response_t)); + + // BIO + int ret = BIO_reset(session->bio); + if (ret == -1) { + printf("BIO_reset failed: %s\n", ERR_error_string(ERR_get_error(), NULL)); + assert(ret != -1); + } + BIO_set_fd(session->bio, -1, BIO_NOCLOSE); + BIO_set_nbio(session->bio, 1); + + if (session->ssl != NULL) { + SSL *ssl = session->ssl; + if (ssl != NULL) { + SSL_set_bio(ssl, session->bio, session->bio); + ret = SSL_clear(ssl); + if (ret != 1) { + printf("SSL_clear failed: %s\n", + ERR_error_string(ERR_get_error(), NULL)); + assert(ret == 1); + } + } + } +} + +static inline void reset_request_at_index(int index) { + session_t *session = &session_node_array[index].session; + char is_ssl = session->request.is_ssl; + + // Buffer + buffer *buffer = session->buffer; + if (buffer->data != NULL) { + free(buffer->data); + buffer->data = NULL; + } + buffer->capacity = 0; + buffer->count = 0; + + // Request + memset(&session->request, 0, sizeof(http_request_t)); + session->request.is_ssl = is_ssl; +} + +static void append_linked_list(session_node_t **ll, session_node_t *node) { + if (*ll == NULL) { + *ll = node; + node->tail = node; + } else { + session_node_t *head = *ll; + head->tail->next = node; + head->tail = node; + } +} + +static session_node_t *pop_linked_list(session_node_t **ll) { + if (*ll == NULL) + return NULL; + + session_node_t *head = *ll; + session_node_t *new_head = head->next; + + if (head->next != NULL) { + new_head->tail = head->tail; + } + *ll = new_head; + + head->next = NULL; + head->tail = NULL; + + return head; +} + +static session_node_t *splice_by_fd_linked_list(session_node_t **ll, int fd) { + session_node_t *curr = *ll; + session_node_t *prev = NULL; + + while (curr != NULL) { + if (curr->session.meta.related_fd == fd) { + if (prev == NULL) { + return pop_linked_list(ll); } else { - *status_array[i] |= WORK_STATUS_PROCESSING; - result.session = session_array[i]; - result.buffer = buffer_array[i]; - result.request = request_array[i]; - result.response = response_array[i]; - result.bio = bio_array[i]; - if (ssl_array != NULL) { - result.ssl = ssl_array[i]; - } + prev->next = curr->next; + curr->next = NULL; } + break; } + + prev = curr; + curr = curr->next; } - pthread_mutex_unlock(&session_mutex); - return result; + return curr; } diff --git a/src/data_structures/session.h b/src/data_structures/session.h index d8ab3cd..133ebc4 100644 --- a/src/data_structures/session.h +++ b/src/data_structures/session.h @@ -7,11 +7,10 @@ int init_session_cache(SSL_CTX *ctx); void destroy_session_cache(); -int get_session_id_for_thread(); void broadcast_session(); // Only works for INITIAL and SENT, since they can receive new data -void push_request(int related_fd, WORK_STATUS status); -struct session_full_return pop_request_by_fd(int related_fd); -struct session_full_return pop_request(WORK_STATUS status); +void push_session(int related_fd, WORK_STATUS status); +session_t *pop_session_for_read(int related_fd); +session_t *pop_session(WORK_STATUS status); #endif // SESSION_H diff --git a/src/handlers/builder.c b/src/handlers/builder.c index d22f24f..fa58a47 100644 --- a/src/handlers/builder.c +++ b/src/handlers/builder.c @@ -1,3 +1,4 @@ +#include "../data_structures/current_session.h" #include "../data_structures/session.h" #include "../http/response.h" #include "../shutdown/stop.h" @@ -5,39 +6,37 @@ #include "../utils/logger.h" void *handle_c() { - struct session_full_return session; + session_t *session; buffer *tmp_buffer = init_buffer(0); - while (!stop()) { + while (!is_shutdown_requested()) { memset(tmp_buffer->data, 0, tmp_buffer->capacity); - session = pop_request(WORK_STATUS_DATA_FETCHED); + session = pop_session(WORK_STATUS_DATA_FETCHED); - if (session.session == NULL) { + if (session == NULL) { continue; } - session.session->thread_id = (long unsigned int)pthread_self(); + put_current_session(session->meta.id); log_trace("Request %d (%d) is being handled by response builder", - session.session->id, session.session->related_fd); + session->meta.id, session->meta.related_fd); - assert(session.request != NULL); - assert(session.response != NULL); - assert(session.buffer != NULL); + assert(session->buffer != NULL); // Deprecated - if (session.response->status_code == HTTP_MOVED_PERMANENTLY) { + if (session->response.status_code == HTTP_MOVED_PERMANENTLY) { construct_upgrade_to_https_response( - session.request->uri, session.request->host, session.buffer); + session->request.uri, session->request.host, session->buffer); } else { - construct_response(session.response, session.request->uri, - session.request->accept_encoding, - session.request->if_none_match, session.buffer, + construct_response(&session->response, session->request.uri, + session->request.accept_encoding, + session->request.if_none_match, session->buffer, tmp_buffer); } WORK_STATUS next_status = WORK_STATUS_READY_TO_SEND; - session.session->thread_id = 0; - push_request(session.session->related_fd, next_status); + clear_current_session(); + push_session(session->meta.related_fd, next_status); } deinit_buffer(tmp_buffer); diff --git a/src/handlers/loader.c b/src/handlers/loader.c index e2b0643..3aea213 100644 --- a/src/handlers/loader.c +++ b/src/handlers/loader.c @@ -1,3 +1,4 @@ +#include "../data_structures/current_session.h" #include "../data_structures/session.h" #include "../http/response.h" #include "../properties.h" @@ -29,7 +30,7 @@ static inline int load_index(sqlite3 *db, sqlite3_stmt **index_projects_stmt, buffer *tmp_buffer, buffer *buffer); static inline void str_replace(buffer *target, const char *needle, const char *replacement); -static inline unsigned int gen_error_body(http_status_code http_status_code, +static inline unsigned int gen_error_body(HTTP_STATUS_CODE http_status_code, buffer *buffer) { const char *status_code_str = http_status_code_to_str(http_status_code); assert(ENSURE_CAPACITY(buffer, 512) > 0); @@ -61,61 +62,59 @@ void *handle_b() { sqlite3 *db = NULL; sqlite3_stmt *index_projects_stmt = NULL, *index_posts_stmt = NULL, *project_stmt = NULL, *post_stmt = NULL; - struct session_full_return session; + session_t *session; assert_log(sqlite3_open_v2(get_db_url(), &db, WB_SQLITE_OPEN_FLAGS, NULL) == SQLITE_OK, "Failed to open database: %s", sqlite3_errmsg(db)); - while (!stop()) { - session = pop_request(WORK_STATUS_PARSED); + while (!is_shutdown_requested()) { + session = pop_session(WORK_STATUS_PARSED); - if (session.session == NULL) { + if (session == NULL) { continue; } - session.session->thread_id = (long unsigned int)pthread_self(); + put_current_session(session->meta.id); log_trace("Request %d (%d) is being handled by response content loader", - session.session->id, session.session->related_fd); + session->meta.id, session->meta.related_fd); - assert(session.request != NULL); - assert(session.response != NULL); - assert(session.buffer != NULL); + assert(session->buffer != NULL); // Request already parsed - memset(session.buffer->data, 0, session.buffer->count); + memset(session->buffer->data, 0, session->buffer->count); - if (session.response->status_code == HTTP_OK) { - const char *file_name = uri_to_file_name(session.request->uri); - read_static_file(file_name, session.buffer); + if (session->response.status_code == HTTP_OK) { + const char *file_name = uri_to_file_name(session->request.uri); + read_static_file(file_name, session->buffer); // Fetch and inject arguments switch (load(db, &index_projects_stmt, &index_posts_stmt, &project_stmt, - &post_stmt, file_name, tmp_buffer, session.buffer, - session.request->uri[1])) { + &post_stmt, file_name, tmp_buffer, session->buffer, + session->request.uri[1])) { case 0: break; case -1: log_error("Failed to fetch and inject arguments"); - session.response->status_code = HTTP_INTERNAL_SERVER_ERROR; - session.buffer->count = - gen_error_body(session.response->status_code, session.buffer); + session->response.status_code = HTTP_INTERNAL_SERVER_ERROR; + session->buffer->count = + gen_error_body(session->response.status_code, session->buffer); break; case -2: - log_error("Id not found in database: %s", session.request->uri[1]); - session.response->status_code = HTTP_NOT_FOUND; - session.buffer->count = - gen_error_body(session.response->status_code, session.buffer); + log_error("Id not found in database: %s", session->request.uri[1]); + session->response.status_code = HTTP_NOT_FOUND; + session->buffer->count = + gen_error_body(session->response.status_code, session->buffer); break; } } else { - session.buffer->count = - gen_error_body(session.response->status_code, session.buffer); + session->buffer->count = + gen_error_body(session->response.status_code, session->buffer); } - session.session->thread_id = 0; - push_request(session.session->related_fd, WORK_STATUS_DATA_FETCHED); + clear_current_session(); + push_session(session->meta.related_fd, WORK_STATUS_DATA_FETCHED); memset(tmp_buffer->data, 0, tmp_buffer->count); tmp_buffer->count = 0; } @@ -316,7 +315,7 @@ static inline void str_replace(buffer *target, const char *needle, assert(hit >= target->data + offset); // Capacity is greater than the difference between needle and replacement // word - if(diff >= 0) + if (diff >= 0) assert(ENSURE_CAPACITY(target, target->count + diff + 1) > 0); // If the target->data has been reallocated diff --git a/src/handlers/parser.c b/src/handlers/parser.c index 308490f..7c2ac84 100644 --- a/src/handlers/parser.c +++ b/src/handlers/parser.c @@ -1,3 +1,4 @@ +#include "../data_structures/current_session.h" #include "../data_structures/session.h" #include "../http/request.h" #include "../shutdown/stop.h" @@ -6,46 +7,45 @@ #include "../utils/logger.h" void *handle_a() { - struct session_full_return session; - while (!stop()) { - session = pop_request(WORK_STATUS_REQUEST_READ); + session_t *session; + while (!is_shutdown_requested()) { + session = pop_session(WORK_STATUS_REQUEST_READ); - if (session.session == NULL) { + if (session == NULL) { continue; } - session.session->thread_id = (long unsigned int)pthread_self(); + put_current_session(session->meta.id); log_trace("Request %d (%d) is being handled by request parser", - session.session->id, session.session->related_fd); + session->meta.id, session->meta.related_fd); - assert(session.request != NULL); - assert(session.response != NULL); - assert(session.buffer != NULL); + assert(session->buffer != NULL); - log_trace("Parsing request:\n%s", session.buffer->data); + log_trace("Parsing request:\n%s", session->buffer->data); - int result = parse_http_request(session.request, session.buffer); + int result = parse_http_request(&session->request, session->buffer); if (result == ALLOCATE_MEMORY_ERROR) { - session.response->status_code = HTTP_INTERNAL_SERVER_ERROR; + session->response.status_code = HTTP_INTERNAL_SERVER_ERROR; } else if (result < 0) { - session.response->status_code = HTTP_NOT_ACCEPTABLE; + session->response.status_code = HTTP_NOT_ACCEPTABLE; } else { - session.response->status_code = validate_request_headers(session.request); + session->response.status_code = + validate_request_headers(&session->request); } log_info("Parsed:\nmethod: %d, uri_tokens: %s %s %s %s, version: %s, " "host: %s, user_agent: %s, accept: %s, " "accept_language: %s, accept_encoding: %s, connection: %d", - session.request->method, session.request->uri[0], - session.request->uri[1], session.request->uri[2], - session.request->uri[3], session.request->version, - session.request->host, session.request->user_agent, - session.request->accept, session.request->accept_language, - session.request->accept_encoding, session.request->connection); - - session.session->thread_id = 0; - push_request(session.session->related_fd, WORK_STATUS_PARSED); + session->request.method, session->request.uri[0], + session->request.uri[1], session->request.uri[2], + session->request.uri[3], session->request.version, + session->request.host, session->request.user_agent, + session->request.accept, session->request.accept_language, + session->request.accept_encoding, session->request.connection); + + clear_current_session(); + push_session(session->meta.related_fd, WORK_STATUS_PARSED); } return NULL; diff --git a/src/handlers/sender.c b/src/handlers/sender.c index cfac1bc..5923c69 100644 --- a/src/handlers/sender.c +++ b/src/handlers/sender.c @@ -1,3 +1,4 @@ +#include "../data_structures/current_session.h" #include "../data_structures/session.h" #include "../shutdown/stop.h" #include "../socket.h" @@ -6,35 +7,33 @@ #include void *handle_d() { - struct session_full_return session; - while (!stop()) { - session = pop_request(WORK_STATUS_READY_TO_SEND); + session_t *session; + while (!is_shutdown_requested()) { + session = pop_session(WORK_STATUS_READY_TO_SEND); - if (session.session == NULL) { + if (session == NULL) { continue; } - session.session->thread_id = (long unsigned int)pthread_self(); + put_current_session(session->meta.id); log_trace("Request %d (%d) is being handled by response sender", - session.session->id, session.session->related_fd); + session->meta.id, session->meta.related_fd); - assert(session.request != NULL); - assert(session.response != NULL); - assert(session.buffer != NULL); + assert(session->buffer != NULL); - char is_ssl = session.request->is_ssl; - log_info("Sending response to session %d:\n%.*s", session.session->id, - session.buffer->count, session.buffer->data); + char is_ssl = session->request.is_ssl; + log_info("Sending response to session %d:\n%.*s", session->meta.id, + session->buffer->count, session->buffer->data); - int send_return = (is_ssl) ? send_ssl(session.ssl, session.buffer) - : send_bio(session.bio, session.buffer); + int send_return = (is_ssl) ? send_ssl(session->ssl, session->buffer) + : send_bio(session->bio, session->buffer); WORK_STATUS next_status = WORK_STATUS_SENT; log_debug("send_return: %d, buffer_size: %lu", send_return, - session.buffer->count); + session->buffer->count); if (is_ssl && send_return <= 0) { - int ssl_error = SSL_get_error(session.ssl, send_return); + int ssl_error = SSL_get_error(session->ssl, send_return); if (ssl_error == SSL_ERROR_WANT_READ) { log_debug("SSL should retry read, pushing data back to queue..."); next_status = WORK_STATUS_SEND_FAILED; @@ -43,32 +42,32 @@ void *handle_d() { next_status = WORK_STATUS_READY_TO_SEND; } else { log_debug("Could not send data to fd %d, closing connection...", - session.session->related_fd); + session->meta.related_fd); next_status = WORK_STATUS_REJECTED; } } else if (!is_ssl && send_return <= 0 && - BIO_should_retry(session.bio) == 1) { + BIO_should_retry(session->bio) == 1) { log_debug("BIO should retry, pushing data back to queue..."); next_status = WORK_STATUS_SEND_FAILED; - } else if (send_return < session.buffer->count) { + } else if (send_return < session->buffer->count) { log_debug("Partial send, pushing data back to sending queue..."); - memmove(session.buffer->data, session.buffer->data + send_return, - session.buffer->count - send_return); - session.buffer->count -= send_return; + memmove(session->buffer->data, session->buffer->data + send_return, + session->buffer->count - send_return); + session->buffer->count -= send_return; next_status = WORK_STATUS_READY_TO_SEND; } else if (send_return == -1) { log_debug("Could not send data to fd %d, closing connection...", - session.session->related_fd); + session->meta.related_fd); next_status = WORK_STATUS_REJECTED; - } else if (session.request->connection == CLOSE) { + } else if (session->request.connection == CLOSE) { log_debug( "Connection close requested, closing connection on fd: %d, id: %d...", - session.session->related_fd, session.session->id); + session->meta.related_fd, session->meta.id); next_status = WORK_STATUS_REJECTED; } - session.session->thread_id = 0; - push_request(session.session->related_fd, next_status); + clear_current_session(); + push_session(session->meta.related_fd, next_status); } return NULL; } diff --git a/src/http/request.c b/src/http/request.c index b10a497..7631008 100644 --- a/src/http/request.c +++ b/src/http/request.c @@ -11,7 +11,7 @@ int parse_header_field(http_request_t *http_request, const char *char_data); // int parse_trailer_fields(http_request_t* http_request, char * char_data); // int extract_body(http_request_t* http_request, char * char_data, long // content_size); -http_method method_str_to_enum(const char *raw_method); +HTTP_METHOD method_str_to_enum(const char *raw_method); int parse_http_request(http_request_t *http_request, const buffer *raw_request) { @@ -126,7 +126,7 @@ int validate_request_headers(const http_request_t *http_request) { int parse_control_data(http_request_t *http_request, const char *raw_control_data) { - http_method method; + HTTP_METHOD method; const char *pattern = "^([A-Z]{2,12}) ([^ ]+) (HTTP/[0-9.]{3})$"; regmatch_t matches[4]; // Method, URI, Version @@ -251,7 +251,7 @@ int parse_header_field(http_request_t *http_request, return 0; } -http_method method_str_to_enum(const char *raw_method) { +HTTP_METHOD method_str_to_enum(const char *raw_method) { if (strncasecmp(raw_method, "POST", 4) == 0) return HTTP_POST; else if (strncasecmp(raw_method, "GET", 3) == 0) diff --git a/src/http/response.c b/src/http/response.c index 0f8290c..8c96cea 100644 --- a/src/http/response.c +++ b/src/http/response.c @@ -8,7 +8,7 @@ #include #include -char *http_status_code_to_str(http_status_code status_code); +char *http_status_code_to_str(HTTP_STATUS_CODE status_code); unsigned int to_string(const http_response_t *http_response, const buffer *body, buffer *dest); unsigned int set_content_length(http_response_t *http_response, @@ -61,7 +61,7 @@ unsigned int construct_response(http_response_t *http_response, } return_value = to_string(http_response, tmp_buffer, body); - if(return_value >= 0) + if (return_value >= 0) body->count = return_value; return return_value; @@ -289,7 +289,7 @@ unsigned int to_string(const http_response_t *http_response, const buffer *body, return offset; } -char *http_status_code_to_str(http_status_code status_code) { +char *http_status_code_to_str(HTTP_STATUS_CODE status_code) { switch (status_code) { case HTTP_OK: return "OK"; diff --git a/src/http/response.h b/src/http/response.h index 494b88f..7c863ee 100644 --- a/src/http/response.h +++ b/src/http/response.h @@ -10,5 +10,5 @@ unsigned int construct_response(http_response_t *http_response, unsigned int construct_upgrade_to_https_response(const uri_token_t uri, const char *host, buffer *buffer); -char *http_status_code_to_str(http_status_code status_code); +char *http_status_code_to_str(HTTP_STATUS_CODE status_code); #endif // HTTP_RESPONSE_H diff --git a/src/listener.c b/src/listener.c index bbb7532..2538ae5 100644 --- a/src/listener.c +++ b/src/listener.c @@ -20,24 +20,24 @@ void _listen(int listener, int listener_ssl, int (*request_handler)(int)); int handle_request_async(int fd) { int recv_return; - struct session_full_return session = pop_request_by_fd(fd); - if (session.session == NULL) { + session_t *session = pop_session_for_read(fd); + if (session == NULL) { log_debug("Session is NULL for fd %d", fd); return -1; } - assert(session.bio != NULL); + assert(session->bio != NULL); - if (session.request->is_ssl) { + if (session->request.is_ssl) { log_debug("Reading SSL request for fd %d", fd); - assert(session.ssl != NULL); + assert(session->ssl != NULL); // Alloc more space for the data - int recv_return = recv_ssl(session.ssl, session.buffer); + recv_return = recv_ssl(session->ssl, session->buffer); if (recv_return > 0) { - push_request(fd, WORK_STATUS_REQUEST_READ); + push_session(fd, WORK_STATUS_REQUEST_READ); return 0; - } else if (recv_return <= 0 && - SSL_get_error(session.ssl, recv_return) == SSL_ERROR_WANT_READ) { - push_request(fd, WORK_STATUS_INITIAL); + } else if (recv_return <= 0 && SSL_get_error(session->ssl, recv_return) == + SSL_ERROR_WANT_READ) { + push_session(fd, WORK_STATUS_INITIAL); return 0; } else { // Got error or connection closed by client @@ -49,12 +49,12 @@ int handle_request_async(int fd) { } } else { log_debug("Reading plain request for fd %d", fd); - recv_return = recv_bio(session.bio, session.buffer); + recv_return = recv_bio(session->bio, session->buffer); if (recv_return > 0) { - push_request(fd, WORK_STATUS_REQUEST_READ); + push_session(fd, WORK_STATUS_REQUEST_READ); return 0; - } else if (recv_return <= 0 && BIO_should_retry(session.bio)) { - push_request(fd, WORK_STATUS_INITIAL); + } else if (recv_return <= 0 && BIO_should_retry(session->bio)) { + push_session(fd, WORK_STATUS_INITIAL); return 0; } else { log_error("BIO read error"); @@ -139,7 +139,7 @@ void _listen(int listener, int listener_ssl, int (*request_handler)(int)) { sigemptyset(&sigmask); // Continously listen for new connections - while (!stop()) { + while (!is_shutdown_requested()) { assert(count > 0); log_info("Number of active sockets (including listener): %d", count); @@ -168,7 +168,7 @@ void _listen(int listener, int listener_ssl, int (*request_handler)(int)) { if (check_for_socket_error(poll->fd) == -1) { log_debug("Socket %d is invalid, removing...", poll->fd); - push_request(poll->fd, WORK_STATUS_REJECTED); + push_session(poll->fd, WORK_STATUS_REJECTED); remove_poll_fd_by_index_sync(&i); continue; } @@ -179,7 +179,7 @@ void _listen(int listener, int listener_ssl, int (*request_handler)(int)) { assert(error_class != RESET); if (error_class == REMOVE_FD) { log_debug("Socket %d error event, removing...", poll->fd); - push_request(poll->fd, WORK_STATUS_REJECTED); + push_session(poll->fd, WORK_STATUS_REJECTED); remove_poll_fd_by_index_sync(&i); continue; } @@ -192,14 +192,14 @@ void _listen(int listener, int listener_ssl, int (*request_handler)(int)) { else if (poll->revents & POLLNVAL) log_debug("%s", get_poll_event_description(POLLNVAL)); if (check_for_socket_error(poll->fd) == -1) { - push_request(poll->fd, WORK_STATUS_REJECTED); + push_session(poll->fd, WORK_STATUS_REJECTED); remove_poll_fd_by_index_sync(&i); continue; } } if (poll->revents & POLLOUT) { log_debug("Socket %d is ready for writing", poll->fd); - push_request(poll->fd, WORK_STATUS_READY_TO_SEND); + push_session(poll->fd, WORK_STATUS_READY_TO_SEND); continue; } @@ -220,11 +220,11 @@ void _listen(int listener, int listener_ssl, int (*request_handler)(int)) { if (poll->fd == listener) { log_debug("New client connected on HTTP socket %s:%d", ip_str, get_in_addr_port((struct sockaddr *)&client_addr)); - push_request(client_socket_fd, WORK_STATUS_INITIAL); + push_session(client_socket_fd, WORK_STATUS_INITIAL); } else { log_debug("New client connected on HTTPS socket %s:%d", ip_str, get_in_addr_port((struct sockaddr *)&client_addr)); - push_request(client_socket_fd, WORK_STATUS_INITIAL_SSL); + push_session(client_socket_fd, WORK_STATUS_INITIAL_SSL); } int poll_array_index = add_poll_fd_sync(client_socket_fd); continue; @@ -232,7 +232,7 @@ void _listen(int listener, int listener_ssl, int (*request_handler)(int)) { log_debug("Polling for existing fd %d to send data...", poll->fd); if (request_handler(poll->fd) == -1) { - push_request(poll->fd, WORK_STATUS_REJECTED); + push_session(poll->fd, WORK_STATUS_REJECTED); remove_poll_fd_by_index_sync(&i); } } diff --git a/src/main.c b/src/main.c index faa2a23..9da6113 100644 --- a/src/main.c +++ b/src/main.c @@ -19,7 +19,7 @@ static int status = 0; void handle_exit(int signum) { log_info("Socket closed due to signal %s", get_signal_description(signum)); - stop_server(); + request_shutdown(); if (http_socket_fd != -1 && status >= 5) { close(http_socket_fd); } diff --git a/src/shutdown/stop.c b/src/shutdown/stop.c index b3e80cc..d1f51e6 100644 --- a/src/shutdown/stop.c +++ b/src/shutdown/stop.c @@ -3,16 +3,16 @@ #include #include -static volatile sig_atomic_t server_stop = 0; +static volatile sig_atomic_t shutdown_requested = 0; -sig_atomic_t stop() { return server_stop; } +sig_atomic_t is_shutdown_requested() { return shutdown_requested; } -void _stop_server(const char *file) { +void _request_shutdown(const char *file) { assert(file != NULL); if (strstr(file, "main.c") == NULL) { return; } - server_stop = 1; + shutdown_requested = 1; } diff --git a/src/shutdown/stop.h b/src/shutdown/stop.h index 3dfecb6..f706da8 100644 --- a/src/shutdown/stop.h +++ b/src/shutdown/stop.h @@ -3,9 +3,9 @@ #include -sig_atomic_t stop(); -void _stop_server(const char *file); +sig_atomic_t is_shutdown_requested(); +void _request_shutdown(const char *file); -#define stop_server() _stop_server(__FILE__) +#define request_shutdown() _request_shutdown(__FILE__) #endif // STOP_H diff --git a/src/static.h b/src/static.h index 5893bf3..52c35d0 100644 --- a/src/static.h +++ b/src/static.h @@ -34,9 +34,9 @@ typedef enum { HTTP_OPTIONS, HTTP_TRACE, HTTP_BAD_METHOD -} http_method; +} HTTP_METHOD; -typedef enum { KEEP_ALIVE, CLOSE } http_connection; +typedef enum { KEEP_ALIVE, CLOSE } HTTP_CONNECTION; typedef enum { HTTP_OK = 200, @@ -49,7 +49,7 @@ typedef enum { HTTP_INTERNAL_SERVER_ERROR = 500, HTTP_NOT_IMPLEMENTED = 501, HTTP_SERVICE_UNAVAILABLE = 503 -} http_status_code; +} HTTP_STATUS_CODE; typedef enum { WORK_STATUS_INITIAL = 1, @@ -68,11 +68,11 @@ typedef struct { int fd; int size; char *data; -} worker_data; +} worker_data_t; typedef struct { void *arg; -} worker_arg; +} worker_arg_t; // TODO: // Upgrade-Insecure-Requests: 1 @@ -83,7 +83,7 @@ typedef struct { // Priority: u=0, i typedef struct { - http_status_code status_code; + HTTP_STATUS_CODE status_code; char content_type[HTTP_HEADER_SIZE]; long content_length; char content_language[HTTP_HEADER_SMALL_SIZE]; @@ -98,7 +98,7 @@ typedef char uri_token_t[HTTP_URI_TOKEN_COUNT][HTTP_URI_TOKEN_SIZE]; typedef struct { uri_token_t uri; - http_method method; + HTTP_METHOD method; char is_ssl; char version[HTTP_VERSION_SIZE]; char host[HTTP_HEADER_SIZE]; @@ -107,33 +107,41 @@ typedef struct { char accept_language[HTTP_HEADER_SIZE]; char accept_encoding[HTTP_HEADER_SIZE]; char if_none_match[HTTP_HEADER_ETAG_SIZE]; - http_connection connection; + HTTP_CONNECTION connection; char content_type[HTTP_HEADER_SIZE]; long content_length; } http_request_t; typedef struct { - worker_data **data; + worker_data_t **data; int count, front, rear; pthread_mutex_t mutex; pthread_cond_t cond; } queue_t; -struct session { +typedef struct { int id; int related_fd; - unsigned long thread_id; -}; +} meta_t; -struct session_full_return { - struct session *session; +typedef struct { + meta_t meta; BIO *bio; SSL *ssl; buffer *buffer; - http_request_t *request; - http_response_t *response; + http_request_t request; + http_response_t response; + unsigned char failed_send_attempts; +} session_t; + +struct session_node { + struct session_node *next; + struct session_node *tail; + session_t session; }; +typedef struct session_node session_node_t; + char *uri_to_file_name(const uri_token_t uri); #endif // STATIC_H diff --git a/src/utils/logger.c b/src/utils/logger.c index 28ae06d..edb76ac 100644 --- a/src/utils/logger.c +++ b/src/utils/logger.c @@ -1,5 +1,5 @@ #include "logger.h" -#include "../data_structures/session.h" +#include "../data_structures/current_session.h" #include "../static.h" #include #include @@ -202,7 +202,7 @@ log_to_buffer(int log_destination, const char *time_str, va_list args_f, va_list args_c, int level) { unsigned int offset = 0; offset += sprintf(log_buffer + (log_buffer_offset + offset), - YEL "%-17s %-30s (%-5s) " RESET, time_str, + YEL "%-21s %-30s (%-5s) " RESET, time_str, relative_file_path, log_level_to_string(level)); if (session_id != -1) { offset += @@ -248,20 +248,21 @@ log_to_buffer(int log_destination, const char *time_str, inline void log_message(LOG_LEVEL level, const char *file, const char *message, va_list args_f, va_list args_c) { time_t rawtime; - struct tm *timeinfo; - char time_str[20]; + struct tm timeinfo; + char temp[20], time_str[24]; pthread_t thread_id = pthread_self(); - int session_id = get_session_id_for_thread(); + int session_id = get_current_session(); char *relative_file_path = (strstr(file, "src/") != NULL) ? strstr(file, "src/") + 4 : strstr(file, "test/") + 5; - // Get the current time - time(&rawtime); - timeinfo = localtime(&rawtime); + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); - // Format the time as YYYY-MM-DD HH:MM:SS - strftime(time_str, sizeof(time_str), "%Y%m%d %H:%M:%S", timeinfo); + localtime_r(&ts.tv_sec, &timeinfo); + + strftime(temp, sizeof(temp), "%Y%m%d %H:%M:%S", &timeinfo); + snprintf(time_str, sizeof(time_str), "%s.%03ld", temp, ts.tv_nsec / 1000000); pthread_mutex_lock(&log_mutex);