diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 51bf1ca..7b66466 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -2,16 +2,31 @@ name: CI on: push: - branches: [ "main" ] + branches-ignore: + - main pull_request: - branches: [ "main" ] + branches: + - main jobs: build: - runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - - name: make - run: make + - name: Checkout Cactus repository + uses: actions/checkout@v4 + with: + path: csky + + - name: Checkout Crux repository as a sibling + uses: actions/checkout@v4 + with: + repository: meslab/crux # Replace with actual org/user + path: crux + + - name: Build Crux + run: make -C crux + + - name: Build CSky + run: make -C csky + diff --git a/include/ads_b.h b/include/ads_b.h index 7a6c1ca..b12e383 100644 --- a/include/ads_b.h +++ b/include/ads_b.h @@ -12,14 +12,14 @@ #define ADSB_SHORT_LEN 7 typedef struct { - uint32_t header; // 4 bytes (DF, CA, ICAO) - uint8_t data[7]; // 7 bytes (ADS-B payload) - uint8_t parity[3]; // 3 bytes (Parity) + uint32_t header; // 4 bytes (DF, CA, ICAO) + uint8_t data[7]; // 7 bytes (ADS-B payload) + uint8_t parity[3]; // 3 bytes (Parity) } adsbExtendedMessage; typedef struct { - uint32_t header; // 4 bytes (DF, CA, ICAO) - uint8_t parity[3]; // 3 bytes (Parity) + uint32_t header; // 4 bytes (DF, CA, ICAO) + uint8_t parity[3]; // 3 bytes (Parity) } adsbShortMessage; void adsb_squitter_parse(Logger *logger, const char *hex_str); @@ -27,6 +27,6 @@ void adsb_squitter_parse(Logger *logger, const char *hex_str); void adsb_ext_message_parse(uint8_t full_message[ADSB_EXT_LEN], Logger *logger); void adsb_short_message_parse(uint8_t full_message[ADSB_SHORT_LEN], - Logger *logger); + Logger *logger); #endif // ADS_B_H diff --git a/include/arg_parser.h b/include/arg_parser.h index 46b125e..a97b23f 100644 --- a/include/arg_parser.h +++ b/include/arg_parser.h @@ -5,12 +5,12 @@ #include typedef struct Options { - uint16_t tcp_port; - uint8_t verbose; - char *tcp_server; - char *log_level; - char *err_log; - char *out_log; + uint16_t tcp_port; + uint8_t verbose; + char *tcp_server; + char *log_level; + char *err_log; + char *out_log; } Options; void arguments_parse(int argc, char *argv[], Options *opt); diff --git a/include/client.h b/include/client.h index 0a57497..19f011f 100644 --- a/include/client.h +++ b/include/client.h @@ -6,14 +6,14 @@ #include "../include/ring_buffer.h" typedef struct { - ringBuffer *rb; - Options *opts; - Logger *logger; + ringBuffer *rb; + Options *opts; + Logger *logger; } TcpClientArgs; int8_t tcp_client_thread_init(TcpClientArgs *tcp_client_args, - ringBuffer *ring_buffer, Options *opts, - Logger *logger); + ringBuffer *ring_buffer, Options *opts, + Logger *logger); void *tcp_client_thread(void *arg); void lines_read(int sockfd, ringBuffer *rb); diff --git a/include/processor.h b/include/processor.h index b221028..162a691 100644 --- a/include/processor.h +++ b/include/processor.h @@ -6,14 +6,14 @@ #include "../include/ring_buffer.h" typedef struct { - ringBuffer *rb; - Options *opts; - Logger *logger; + ringBuffer *rb; + Options *opts; + Logger *logger; } ProcessorArgs; int8_t data_processor_thread_args_init(ProcessorArgs *data_processor_args, - ringBuffer *ring_buffer, Options *opts, - Logger *logger); + ringBuffer *ring_buffer, Options *opts, + Logger *logger); void *data_processor_thread(void *arg); size_t squitter_strip_chars(char *str); diff --git a/include/ring_buffer.h b/include/ring_buffer.h index 1d41938..21ca8be 100644 --- a/include/ring_buffer.h +++ b/include/ring_buffer.h @@ -8,12 +8,12 @@ #define MAX_LINE_LENGTH 63 // Max length of a single line typedef struct { - char buffer[BUFFER_SIZE][MAX_LINE_LENGTH + 1]; // Extra byte for '\0' - unsigned int head; - unsigned int tail; - pthread_mutex_t mutex; - pthread_cond_t not_empty; - pthread_cond_t not_full; + char buffer[BUFFER_SIZE][MAX_LINE_LENGTH + 1]; // Extra byte for '\0' + unsigned int head; + unsigned int tail; + pthread_mutex_t mutex; + pthread_cond_t not_empty; + pthread_cond_t not_full; } ringBuffer; void ring_buffer_init(ringBuffer *rb); diff --git a/src/ads_b.c b/src/ads_b.c index 810d6dc..a3fa46b 100644 --- a/src/ads_b.c +++ b/src/ads_b.c @@ -5,74 +5,74 @@ /// @param bytes /// @param len void hex_to_bytes(const char *hex_str, uint8_t *bytes, size_t len) { - for (size_t i = 0; i < len; i++) { - sscanf(hex_str + 2 * i, "%2hhx", &bytes[i]); - } + for (size_t i = 0; i < len; i++) { + sscanf(hex_str + 2 * i, "%2hhx", &bytes[i]); + } } /// @brief Process the ADS-B hex string /// @param logger /// @param hex_str The hex string to process void adsb_squitter_parse(Logger *logger, const char *hex_str) { - size_t len = strlen(hex_str); + size_t len = strlen(hex_str); - uint8_t short_message[7]; // Maximum size: 7 bytes (56 bits) - uint8_t full_message[14]; // Maximum size: 14 bytes (112 bits) + uint8_t short_message[7]; // Maximum size: 7 bytes (56 bits) + uint8_t full_message[14]; // Maximum size: 14 bytes (112 bits) - switch (len) { - case 26: - hex_to_bytes(hex_str + 12, short_message, len / 2 - 6); - adsb_short_message_parse(short_message, logger); - break; - case 40: - hex_to_bytes(hex_str + 12, full_message, len / 2 - 6); - adsb_ext_message_parse(full_message, logger); - break; - default: - error_log(logger, "Invalid hex string length\n"); - break; - } + switch (len) { + case 26: + hex_to_bytes(hex_str + 12, short_message, len / 2 - 6); + adsb_short_message_parse(short_message, logger); + break; + case 40: + hex_to_bytes(hex_str + 12, full_message, len / 2 - 6); + adsb_ext_message_parse(full_message, logger); + break; + default: + error_log(logger, "Invalid hex string length\n"); + break; + } } /// @brief Process an extended message /// @param full_message /// @param logger void adsb_ext_message_parse(uint8_t full_message[ADSB_EXT_LEN], - Logger *logger) { - adsbExtendedMessage msg_ext; - msg_ext.header = (full_message[0] << 24) | (full_message[1] << 16) | - (full_message[2] << 8) | full_message[3]; - memcpy(msg_ext.data, full_message + 4, 7); - memcpy(msg_ext.parity, full_message + 11, 3); + Logger *logger) { + adsbExtendedMessage msg_ext; + msg_ext.header = (full_message[0] << 24) | (full_message[1] << 16) | + (full_message[2] << 8) | full_message[3]; + memcpy(msg_ext.data, full_message + 4, 7); + memcpy(msg_ext.parity, full_message + 11, 3); - char data_str[3 * 7]; // Each byte: "XX " (3 chars), 7 bytes -> 21 chars - char parity_str[3 * 3]; // Each byte: "XX " (3 chars), 3 bytes -> 9 chars + char data_str[3 * 7]; // Each byte: "XX " (3 chars), 7 bytes -> 21 chars + char parity_str[3 * 3]; // Each byte: "XX " (3 chars), 3 bytes -> 9 chars - snprintf(data_str, sizeof(data_str), "%02X %02X %02X %02X %02X %02X %02X", - msg_ext.data[0], msg_ext.data[1], msg_ext.data[2], msg_ext.data[3], - msg_ext.data[4], msg_ext.data[5], msg_ext.data[6]); + snprintf(data_str, sizeof(data_str), "%02X %02X %02X %02X %02X %02X %02X", + msg_ext.data[0], msg_ext.data[1], msg_ext.data[2], msg_ext.data[3], + msg_ext.data[4], msg_ext.data[5], msg_ext.data[6]); - snprintf(parity_str, sizeof(parity_str), "%02X %02X %02X", msg_ext.parity[0], - msg_ext.parity[1], msg_ext.parity[2]); + snprintf(parity_str, sizeof(parity_str), "%02X %02X %02X", msg_ext.parity[0], + msg_ext.parity[1], msg_ext.parity[2]); - info_log_formatted(logger, "\nHeader: 0x%08X\nData: %s\nParity: %s", - msg_ext.header, data_str, parity_str); + info_log_formatted(logger, "\nHeader: 0x%08X\nData: %s\nParity: %s", + msg_ext.header, data_str, parity_str); } /// @brief Process a short message /// @param full_message /// @param logger void adsb_short_message_parse(uint8_t full_message[ADSB_SHORT_LEN], - Logger *logger) { - adsbShortMessage msg_short; - msg_short.header = (full_message[0] << 24) | (full_message[1] << 16) | - (full_message[2] << 8) | full_message[3]; - memcpy(msg_short.parity, full_message + 4, 3); + Logger *logger) { + adsbShortMessage msg_short; + msg_short.header = (full_message[0] << 24) | (full_message[1] << 16) | + (full_message[2] << 8) | full_message[3]; + memcpy(msg_short.parity, full_message + 4, 3); - char parity_str[3 * 3]; // Enough for "XX XX XX\0" - snprintf(parity_str, sizeof(parity_str), "%02X %02X %02X", - msg_short.parity[0], msg_short.parity[1], msg_short.parity[2]); + char parity_str[3 * 3]; // Enough for "XX XX XX\0" + snprintf(parity_str, sizeof(parity_str), "%02X %02X %02X", + msg_short.parity[0], msg_short.parity[1], msg_short.parity[2]); - info_log_formatted(logger, "\nHeader: 0x%08X\nParity: %s", msg_short.header, - parity_str); + info_log_formatted(logger, "\nHeader: 0x%08X\nParity: %s", msg_short.header, + parity_str); } diff --git a/src/arg_parser.c b/src/arg_parser.c index 821cc07..7d0cb0b 100644 --- a/src/arg_parser.c +++ b/src/arg_parser.c @@ -13,64 +13,64 @@ */ void arguments_parse(int argc, char *argv[], Options *opts) { - opts->tcp_server = "172.22.132.124"; - opts->tcp_port = 30002; - opts->verbose = 0; - opts->err_log = NULL; - opts->out_log = NULL; - opts->log_level = "ERROR"; + opts->tcp_server = "172.22.132.124"; + opts->tcp_port = 30002; + opts->verbose = 0; + opts->err_log = NULL; + opts->out_log = NULL; + opts->log_level = "ERROR"; - static struct option long_options[] = { - {"tcp-server", optional_argument, 0, 't'}, - {"tcp-port", optional_argument, 0, 'p'}, - {"log", optional_argument, 0, 'o'}, - {"error-log", optional_argument, 0, 'e'}, - {"log-level", optional_argument, 0, 'l'}, - {"verbose", no_argument, 0, 'v'}, - {"help", no_argument, 0, 'h'}, - {0, 0, 0, 0}}; + static struct option long_options[] = { + {"tcp-server", optional_argument, 0, 't'}, + {"tcp-port", optional_argument, 0, 'p'}, + {"log", optional_argument, 0, 'o'}, + {"error-log", optional_argument, 0, 'e'}, + {"log-level", optional_argument, 0, 'l'}, + {"verbose", no_argument, 0, 'v'}, + {"help", no_argument, 0, 'h'}, + {0, 0, 0, 0}}; - int opt; - while ((opt = getopt_long(argc, argv, "t:p:o:e:l:vh", long_options, NULL)) != - -1) { - switch (opt) { - case 't': - if (optarg) { - opts->tcp_server = optarg; - } - break; - case 'p': - if (optarg) { - opts->tcp_port = atoi(optarg); - } - break; - case 'o': - if (optarg) { - opts->out_log = optarg; - } - break; - case 'e': - if (optarg) { - opts->err_log = optarg; - } - break; - case 'l': - if (optarg) { - opts->log_level = optarg; - to_upper_case(opts->log_level); - } - break; - case 'v': - opts->verbose = 1; - break; - case 'h': - printf("Usage: %s [-t | --tcp-server ] [-p | --tcp-port " - "value] [-o | --log value] [-e | --error-log value] [-l | " - "--log-level value [-v | --verbose] [-h | --help]\n", - argv[0]); - exit(0); - default: - exit(1); - } - } + int opt; + while ((opt = getopt_long(argc, argv, "t:p:o:e:l:vh", long_options, NULL)) != + -1) { + switch (opt) { + case 't': + if (optarg) { + opts->tcp_server = optarg; + } + break; + case 'p': + if (optarg) { + opts->tcp_port = atoi(optarg); + } + break; + case 'o': + if (optarg) { + opts->out_log = optarg; + } + break; + case 'e': + if (optarg) { + opts->err_log = optarg; + } + break; + case 'l': + if (optarg) { + opts->log_level = optarg; + to_upper_case(opts->log_level); + } + break; + case 'v': + opts->verbose = 1; + break; + case 'h': + printf("Usage: %s [-t | --tcp-server ] [-p | --tcp-port " + "value] [-o | --log value] [-e | --error-log value] [-l | " + "--log-level value [-v | --verbose] [-h | --help]\n", + argv[0]); + exit(0); + default: + exit(1); + } + } } diff --git a/src/client.c b/src/client.c index 983da75..cea2a04 100644 --- a/src/client.c +++ b/src/client.c @@ -1,5 +1,5 @@ -#include "../include/client.h" #include "../../crux/include/logger.h" +#include "../include/client.h" #include #include @@ -16,16 +16,16 @@ * @return int8_t 0 on success, -1 on failure */ inline int8_t tcp_client_thread_init(TcpClientArgs *tcp_client_args, - ringBuffer *ring_buffer, Options *opts, - Logger *logger) { - if (!tcp_client_args) { - return -1; - } - tcp_client_args->rb = ring_buffer; - tcp_client_args->opts = opts; - tcp_client_args->logger = logger; - - return 0; + ringBuffer *ring_buffer, Options *opts, + Logger *logger) { + if (!tcp_client_args) { + return -1; + } + tcp_client_args->rb = ring_buffer; + tcp_client_args->opts = opts; + tcp_client_args->logger = logger; + + return 0; } /** @@ -35,42 +35,42 @@ inline int8_t tcp_client_thread_init(TcpClientArgs *tcp_client_args, * @return void* */ void *tcp_client_thread(void *arg) { - TcpClientArgs *args = (TcpClientArgs *)arg; + TcpClientArgs *args = (TcpClientArgs *)arg; - Options *opts = args->opts; - ringBuffer *rb = args->rb; - Logger *logger = args->logger; + Options *opts = args->opts; + ringBuffer *rb = args->rb; + Logger *logger = args->logger; - int sockfd; - struct sockaddr_in server_addr; + int sockfd; + struct sockaddr_in server_addr; - if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - error_log(logger, "Socket creation failed"); - exit(EXIT_FAILURE); - } + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + error_log(logger, "Socket creation failed"); + exit(EXIT_FAILURE); + } - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(opts->tcp_port); - inet_pton(AF_INET, opts->tcp_server, &server_addr.sin_addr); + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(opts->tcp_port); + inet_pton(AF_INET, opts->tcp_server, &server_addr.sin_addr); - info_log_formatted(logger, "Connecting to server %s:%d\n", opts->tcp_server, - opts->tcp_port); + info_log_formatted(logger, "Connecting to server %s:%d\n", opts->tcp_server, + opts->tcp_port); - if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == - -1) { - error_log(logger, "Connection failed"); - close(sockfd); - exit(EXIT_FAILURE); - } + if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == + -1) { + error_log(logger, "Connection failed"); + close(sockfd); + exit(EXIT_FAILURE); + } - info_log_formatted(logger, "Connected to server %s:%d\n", opts->tcp_server, - opts->tcp_port); + info_log_formatted(logger, "Connected to server %s:%d\n", opts->tcp_server, + opts->tcp_port); - lines_read(sockfd, rb); + lines_read(sockfd, rb); - info_log(logger, "Server disconnected."); - close(sockfd); - return NULL; + info_log(logger, "Server disconnected."); + close(sockfd); + return NULL; } /** @@ -80,28 +80,28 @@ void *tcp_client_thread(void *arg) { * @param rb Ring buffer */ void lines_read(int sockfd, ringBuffer *rb) { - char buffer[128 * 1024]; // Temporary buffer - char line[MAX_LINE_LENGTH + 1]; // Stores extracted line - int line_pos = 0; - - while (1) { - ssize_t bytes_read = read(sockfd, buffer, sizeof(buffer) - 1); - if (bytes_read <= 0) - break; // Exit on error or disconnect - - buffer[bytes_read] = '\0'; - - // Process the buffer character by character - for (int i = 0; i < bytes_read; i++) { - if (buffer[i] == '\n' || line_pos >= MAX_LINE_LENGTH) { - line[line_pos] = '\0'; - if (line_pos > 0) { - ring_buffer_insert(rb, line); - } - line_pos = 0; - } else { - line[line_pos++] = buffer[i]; - } - } - } + char buffer[128 * 1024]; // Temporary buffer + char line[MAX_LINE_LENGTH + 1]; // Stores extracted line + int line_pos = 0; + + while (1) { + ssize_t bytes_read = read(sockfd, buffer, sizeof(buffer) - 1); + if (bytes_read <= 0) + break; // Exit on error or disconnect + + buffer[bytes_read] = '\0'; + + // Process the buffer character by character + for (int i = 0; i < bytes_read; i++) { + if (buffer[i] == '\n' || line_pos >= MAX_LINE_LENGTH) { + line[line_pos] = '\0'; + if (line_pos > 0) { + ring_buffer_insert(rb, line); + } + line_pos = 0; + } else { + line[line_pos++] = buffer[i]; + } + } + } } diff --git a/src/processor.c b/src/processor.c index 340a2e8..345432f 100644 --- a/src/processor.c +++ b/src/processor.c @@ -1,14 +1,8 @@ -#include "../../crux/include/linear_arena.h" #include "../../crux/include/logger.h" #include "../../crux/include/utils.h" -#include "../include/ads_b.h" -#include "../include/arg_parser.h" -#include "../include/client.h" -#include "../include/processor.h" #include "../include/processor.h" +#include "../include/ads_b.h" #include "../include/ring_buffer.h" -#include "../include/ring_buffer.h" -#include "../include/utils.h" #include #include #include @@ -21,54 +15,54 @@ /// @return int8_t 0 on success, -1 on failure inline int8_t data_processor_thread_args_init(ProcessorArgs *data_processor_args, - ringBuffer *ring_buffer, Options *opts, - Logger *logger) { - if (!data_processor_args) { - return -1; - } - data_processor_args->rb = ring_buffer; - data_processor_args->opts = opts; - data_processor_args->logger = logger; + ringBuffer *ring_buffer, Options *opts, + Logger *logger) { + if (!data_processor_args) { + return -1; + } + data_processor_args->rb = ring_buffer; + data_processor_args->opts = opts; + data_processor_args->logger = logger; - return 0; + return 0; } /// @brief Data processor thread /// @param arg Processor arguments /// @return void* void *data_processor_thread(void *arg) { - ProcessorArgs *args = (ProcessorArgs *)arg; + ProcessorArgs *args = (ProcessorArgs *)arg; - ringBuffer *rb = args->rb; - Logger *logger = args->logger; + ringBuffer *rb = args->rb; + Logger *logger = args->logger; - while (1) { - char line[MAX_LINE_LENGTH + 1]; - if (ring_buffer_get(rb, line) == 0) { - squitter_strip_chars(line); - debug_log(logger, line); - debug_log_formatted( - logger, "Head: %2d, tail: %2d, entries: %2d", rb->head, rb->tail, - rb->head < rb->tail ? rb->head - rb->tail + BUFFER_SIZE - : rb->head - rb->tail); - adsb_squitter_parse(logger, line); - } - } - return NULL; + while (1) { + char line[MAX_LINE_LENGTH + 1]; + if (ring_buffer_get(rb, line) == 0) { + squitter_strip_chars(line); + debug_log(logger, line); + debug_log_formatted( + logger, "Head: %2d, tail: %2d, entries: %2d", rb->head, rb->tail, + rb->head < rb->tail ? rb->head - rb->tail + BUFFER_SIZE + : rb->head - rb->tail); + adsb_squitter_parse(logger, line); + } + } + return NULL; } /// @brief Strip characters from a string /// @param str String to strip characters from (in-place) /// @return void size_t squitter_strip_chars(char *str) { - int i, j = 0; - int len = strlen(str); + int i, j = 0; + int len = strlen(str); - for (i = 0; i < len; i++) { - if (str[i] != '@' && str[i] != ';') { - str[j++] = str[i]; - } - } - str[j] = '\0'; - return j; + for (i = 0; i < len; i++) { + if (str[i] != '@' && str[i] != ';') { + str[j++] = str[i]; + } + } + str[j] = '\0'; + return j; } diff --git a/src/ring_buffer.c b/src/ring_buffer.c index f266b44..df573df 100644 --- a/src/ring_buffer.c +++ b/src/ring_buffer.c @@ -6,40 +6,40 @@ * Initialize the ring buffer */ void ring_buffer_init(ringBuffer *rb) { - rb->head = 0; - rb->tail = 0; - pthread_mutex_init(&rb->mutex, NULL); - pthread_cond_init(&rb->not_empty, NULL); - pthread_cond_init(&rb->not_full, NULL); + rb->head = 0; + rb->tail = 0; + pthread_mutex_init(&rb->mutex, NULL); + pthread_cond_init(&rb->not_empty, NULL); + pthread_cond_init(&rb->not_full, NULL); } /** * Insert a new line into the ring buffer */ void ring_buffer_insert(ringBuffer *rb, const char *line) { - pthread_mutex_lock(&rb->mutex); - while (((rb->head + 1) & BUFFER_MASK) == rb->tail) { - pthread_cond_wait(&rb->not_full, &rb->mutex); // Wait if buffer is full - } - strncpy(rb->buffer[rb->head], line, MAX_LINE_LENGTH); - rb->buffer[rb->head][MAX_LINE_LENGTH] = '\0'; // Ensure null termination - rb->head = (rb->head + 1) & BUFFER_MASK; - pthread_cond_signal(&rb->not_empty); // Signal that buffer is not empty - pthread_mutex_unlock(&rb->mutex); + pthread_mutex_lock(&rb->mutex); + while (((rb->head + 1) & BUFFER_MASK) == rb->tail) { + pthread_cond_wait(&rb->not_full, &rb->mutex); // Wait if buffer is full + } + strncpy(rb->buffer[rb->head], line, MAX_LINE_LENGTH); + rb->buffer[rb->head][MAX_LINE_LENGTH] = '\0'; // Ensure null termination + rb->head = (rb->head + 1) & BUFFER_MASK; + pthread_cond_signal(&rb->not_empty); // Signal that buffer is not empty + pthread_mutex_unlock(&rb->mutex); } /** * Retrieve a line from the ring buffer */ int ring_buffer_get(ringBuffer *rb, char *line) { - pthread_mutex_lock(&rb->mutex); - while (rb->head == rb->tail) { - pthread_cond_wait(&rb->not_empty, &rb->mutex); // Wait if buffer is empty - } - strncpy(line, rb->buffer[rb->tail], MAX_LINE_LENGTH); - line[MAX_LINE_LENGTH] = '\0'; - rb->tail = (rb->tail + 1) & BUFFER_MASK; - pthread_cond_signal(&rb->not_full); // Signal that buffer is not full - pthread_mutex_unlock(&rb->mutex); - return 0; + pthread_mutex_lock(&rb->mutex); + while (rb->head == rb->tail) { + pthread_cond_wait(&rb->not_empty, &rb->mutex); // Wait if buffer is empty + } + strncpy(line, rb->buffer[rb->tail], MAX_LINE_LENGTH); + line[MAX_LINE_LENGTH] = '\0'; + rb->tail = (rb->tail + 1) & BUFFER_MASK; + pthread_cond_signal(&rb->not_full); // Signal that buffer is not full + pthread_mutex_unlock(&rb->mutex); + return 0; }