From 21af9aa631e21593c312c0db3ff6c610f266800b Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Sat, 21 Mar 2026 13:34:56 +1300 Subject: [PATCH] refactor: improve C port robustness and maintainability - Graceful error handling: replace errx() in request parsing with goto cleanup that sends an error term back to Erlang and continues processing. Port no longer crashes on malformed requests. - Fix memory leaks: cleanup path properly frees all partially allocated resources (pid, ref, url, headers, cookies, post_data, eopts) on parse failure. - Exponential buffer growth in write_cb: use doubling strategy starting at 4KB instead of exact-fit realloc on every chunk. Reduces allocations from O(n) to O(log n) for large responses. - Split erl_input into focused helpers: parse_slist, parse_body, parse_eopts, free_eopts, send_parse_error_to_erlang. Main function reduced from 296 to ~80 lines. - Move global to_erlang/from_erlang bufferevents into GlobalInfo struct, eliminating module-level mutable state. - Replace atoi with strtol for CLI argument parsing. - Remove obsolete max_pipeline_length option (HTTP/1.1 pipelining is deprecated and ignored by modern curl). --- README.md | 1 - c_src/katipo.c | 595 ++++++++++++++++++++++++------------------ src/katipo.erl | 11 +- test/katipo_SUITE.erl | 43 ++- 4 files changed, 366 insertions(+), 284 deletions(-) diff --git a/README.md b/README.md index dd144aa..b3336ba 100644 --- a/README.md +++ b/README.md @@ -123,7 +123,6 @@ katipo:Method(Pool :: atom(), URL :: binary(), ReqOptions :: map()). | Option | Type | Default | Note | |:------------------------|:------------------------------|:-------------|:-----------------------------------------------------------------------------------------------| | `pipelining` | `nothing`
`http1`
`multiplex` | `nothing` | HTTP pipelining [CURLMOPT_PIPELINING](https://curl.se/libcurl/c/CURLMOPT_PIPELINING.html) | -| `max_pipeline_length` | `non_neg_integer()` | 100 | | | `max_total_connections` | `non_neg_integer()` | 0 (no limit) | [docs](https://curl.se/libcurl/c/CURLMOPT_MAX_TOTAL_CONNECTIONS.html) | | `max_concurrent_streams`| `non_neg_integer()` | 100 | [docs](https://curl.se/libcurl/c/CURLMOPT_MAX_CONCURRENT_STREAMS.html) curl >= 7.67.0 | diff --git a/c_src/katipo.c b/c_src/katipo.c index b56410b..d35f2a0 100644 --- a/c_src/katipo.c +++ b/c_src/katipo.c @@ -58,9 +58,6 @@ #define K_CURLAUTH_NTLM 103 #define K_CURLAUTH_NEGOTIATE 104 -struct bufferevent *to_erlang; -struct bufferevent *from_erlang; - typedef struct _GlobalInfo { struct event_base *evbase; struct event *timer_event; @@ -69,6 +66,8 @@ typedef struct _GlobalInfo { int still_running; size_t to_get; curl_version_info_data *ver; + struct bufferevent *to_erlang; + struct bufferevent *from_erlang; } GlobalInfo; typedef struct _ConnInfo { @@ -78,6 +77,7 @@ typedef struct _ConnInfo { erlang_ref *ref; char *memory; size_t size; + size_t capacity; GlobalInfo *global; char error[CURL_ERROR_SIZE]; size_t num_headers; @@ -400,18 +400,18 @@ static void mcode_or_die(const char *where, CURLMcode code) { } } -static void send_to_erlang(char *buffer, size_t buffer_len) { +static void send_to_erlang(GlobalInfo *global, char *buffer, size_t buffer_len) { u_int32_t erl_pkt_len; char size_buf[4]; erl_pkt_len = htonl((uint32_t)buffer_len); (void)memcpy(size_buf, &erl_pkt_len, sizeof(erl_pkt_len)); - if (bufferevent_write(to_erlang, size_buf, sizeof(size_buf)) < 0) { + if (bufferevent_write(global->to_erlang, size_buf, sizeof(size_buf)) < 0) { errx(2, "bufferevent_write"); } - if (bufferevent_write(to_erlang, buffer, buffer_len) < 0) { + if (bufferevent_write(global->to_erlang, buffer, buffer_len) < 0) { errx(2, "bufferevent_write"); } } @@ -521,7 +521,7 @@ static void send_ok_to_erlang(ConnInfo *conn) { encode_metrics(&result, conn); - send_to_erlang(result.buff, result.index); + send_to_erlang(conn->global, result.buff, result.index); ei_x_free(&result); } @@ -551,7 +551,7 @@ static void send_error_to_erlang(CURLcode curl_code, ConnInfo *conn) { encode_metrics(&result, conn); - send_to_erlang(result.buff, result.index); + send_to_erlang(conn->global, result.buff, result.index); ei_x_free(&result); } @@ -690,13 +690,20 @@ static int multi_timer_cb(CURLM *multi, long timeout_ms, void *userp) { static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data) { size_t realsize = size * nmemb; ConnInfo *conn = (ConnInfo *)data; - char *new_memory; - new_memory = (char *)realloc(conn->memory, conn->size + realsize); - if (new_memory == NULL) { - return 0; + size_t needed = conn->size + realsize; + if (needed > conn->capacity) { + size_t new_cap = conn->capacity ? conn->capacity : 4096; + while (new_cap < needed) { + new_cap *= 2; + } + char *new_memory = (char *)realloc(conn->memory, new_cap); + if (new_memory == NULL) { + return 0; + } + conn->memory = new_memory; + conn->capacity = new_cap; } - conn->memory = new_memory; memcpy(&(conn->memory[conn->size]), ptr, realsize); conn->size += realsize; @@ -779,8 +786,9 @@ static void new_conn(long method, char *url, struct curl_slist *req_headers, conn = calloc(1, sizeof(*conn)); conn->error[0] = '\0'; - conn->memory = (char *)malloc(1); + conn->memory = NULL; conn->size = 0; + conn->capacity = 0; conn->num_headers = 0; conn->resp_headers = NULL; @@ -952,46 +960,272 @@ static void new_conn(long method, char *url, struct curl_slist *req_headers, mcode_or_die("new_conn: curl_multi_add_handle", rc); } +static void send_parse_error_to_erlang(GlobalInfo *global, + erlang_pid *pid, erlang_ref *ref, + const char *msg) { + ei_x_buff result; + size_t msg_len = strlen(msg); + + if (ei_x_new_with_version(&result) || + ei_x_encode_tuple_header(&result, 2) || + ei_x_encode_atom(&result, "error") || + ei_x_encode_tuple_header(&result, 2) || + ei_x_encode_tuple_header(&result, 2) || + ei_x_encode_pid(&result, pid) || + ei_x_encode_ref(&result, ref) || + ei_x_encode_tuple_header(&result, 3) || + ei_x_encode_atom(&result, "bad_opts") || + ei_x_encode_binary(&result, msg, msg_len) || + ei_x_encode_empty_list(&result)) { + errx(2, "Failed to encode parse error"); + } + + send_to_erlang(global, result.buff, result.index); + ei_x_free(&result); +} + +static int parse_slist(char *buf, int *index, struct curl_slist **out) { + int num_items; + int erl_type; + int size; + long sizel; + + if (ei_decode_list_header(buf, index, &num_items)) { + return -1; + } + *out = NULL; + for (int i = 0; i < num_items; i++) { + if (ei_get_type(buf, index, &erl_type, &size)) { + return -1; + } + char *item = (char *)malloc(size + 1); + if (ei_decode_binary(buf, index, item, &sizel)) { + free(item); + return -1; + } + item[size] = '\0'; + *out = curl_slist_append(*out, item); + free(item); + } + if (num_items > 0 && ei_skip_term(buf, index)) { + return -1; + } + return 0; +} + +static int parse_body(char *buf, int *index, char **out_data, int *out_size) { + int erl_type; + int size; + int index_prev; + + if (ei_get_type(buf, index, &erl_type, &size)) { + return -1; + } + index_prev = *index; + if (ei_decode_iodata(buf, index, out_size, NULL)) { + return -1; + } + *index = index_prev; + *out_data = (char *)malloc(*out_size); + if (ei_decode_iodata(buf, index, out_size, *out_data)) { + free(*out_data); + *out_data = NULL; + return -1; + } + return 0; +} + +static int parse_eopts(char *buf, int *index, EasyOpts *eopts) { + int num_eopts; + int arity; + int erl_type; + int size; + long eopt; + long eopt_long; + long sizel; + char *eopt_binary; + + if (ei_decode_list_header(buf, index, &num_eopts)) { + return -1; + } + for (int i = 0; i < num_eopts; i++) { + if (ei_decode_tuple_header(buf, index, &arity) || + ei_decode_long(buf, index, &eopt)) { + return -1; + } + + if (ei_get_type(buf, index, &erl_type, &size)) { + return -1; + } + switch (erl_type) { + case ERL_SMALL_INTEGER_EXT: + case ERL_INTEGER_EXT: + if (ei_decode_long(buf, index, &eopt_long)) { + return -1; + } + switch (eopt) { + case K_CURLOPT_CONNECTTIMEOUT_MS: + eopts->curlopt_connecttimeout_ms = eopt_long; + break; + case K_CURLOPT_FOLLOWLOCATION: + eopts->curlopt_followlocation = eopt_long; + break; + case K_CURLOPT_SSL_VERIFYHOST: + eopts->curlopt_ssl_verifyhost = eopt_long; + break; + case K_CURLOPT_SSL_VERIFYPEER: + eopts->curlopt_ssl_verifypeer = eopt_long; + break; + case K_CURLOPT_TIMEOUT_MS: + eopts->curlopt_timeout_ms = eopt_long; + break; + case K_CURLOPT_MAXREDIRS: + eopts->curlopt_maxredirs = eopt_long; + break; + case K_CURLOPT_TCP_FASTOPEN: + eopts->curlopt_tcp_fastopen = eopt_long; + break; + case K_CURLOPT_HTTP_AUTH: + if (eopt_long == K_CURLAUTH_BASIC) { + eopts->curlopt_http_auth = CURLAUTH_BASIC; + } else if (eopt_long == K_CURLAUTH_DIGEST) { + eopts->curlopt_http_auth = CURLAUTH_DIGEST; + } else if (eopt_long == K_CURLAUTH_NTLM) { + eopts->curlopt_http_auth = CURLAUTH_NTLM; + } else if (eopt_long == K_CURLAUTH_NEGOTIATE) { + eopts->curlopt_http_auth = CURLAUTH_NEGOTIATE; + } else if (eopt_long != K_CURLAUTH_UNDEFINED) { + return -1; + } + break; + case K_CURLOPT_HTTP_VERSION: + eopts->curlopt_http_version = eopt_long; + break; + case K_CURLOPT_VERBOSE: + eopts->curlopt_verbose = eopt_long; + break; + case K_CURLOPT_SSLVERSION: + eopts->curlopt_sslversion = eopt_long; + break; + case K_CURLOPT_DNS_CACHE_TIMEOUT: + eopts->curlopt_dns_cache_timeout = eopt_long; + break; + case K_CURLOPT_CA_CACHE_TIMEOUT: + eopts->curlopt_ca_cache_timeout = eopt_long; + break; + case K_CURLOPT_PIPEWAIT: + eopts->curlopt_pipewait = eopt_long; + break; + default: + break; + } + break; + case ERL_BINARY_EXT: + eopt_binary = (char *)malloc(size + 1); + if (ei_decode_binary(buf, index, eopt_binary, &sizel)) { + free(eopt_binary); + return -1; + } + eopt_binary[size] = '\0'; + switch (eopt) { + case K_CURLOPT_CAPATH: + eopts->curlopt_capath = eopt_binary; + break; + case K_CURLOPT_CACERT: + eopts->curlopt_cacert = eopt_binary; + break; + case K_CURLOPT_USERNAME: + eopts->curlopt_username = eopt_binary; + break; + case K_CURLOPT_PASSWORD: + eopts->curlopt_password = eopt_binary; + break; + case K_CURLOPT_PROXY: + eopts->curlopt_proxy = eopt_binary; + break; + case K_CURLOPT_INTERFACE: + eopts->curlopt_interface = eopt_binary; + break; + case K_CURLOPT_UNIX_SOCKET_PATH: + eopts->curlopt_unix_socket_path = eopt_binary; + break; + case K_CURLOPT_DOH_URL: + eopts->curlopt_doh_url = eopt_binary; + break; + case K_CURLOPT_SSLCERT: + eopts->curlopt_sslcert = eopt_binary; + break; + case K_CURLOPT_SSLKEY: + eopts->curlopt_sslkey = eopt_binary; + break; + case K_CURLOPT_SSLKEY_BLOB: + eopts->curlopt_sslkey_blob = eopt_binary; + eopts->curlopt_sslkey_blob_size = size; + break; + case K_CURLOPT_KEYPASSWD: + eopts->curlopt_keypasswd = eopt_binary; + break; + case K_CURLOPT_USERPWD: + eopts->curlopt_userpwd = eopt_binary; + break; + default: + free(eopt_binary); + } + break; + case ERL_ATOM_EXT: + if (ei_skip_term(buf, index)) { + return -1; + } + break; + default: + return -1; + } + } + + if (num_eopts > 0 && ei_skip_term(buf, index)) { + return -1; + } + return 0; +} + +static void free_eopts(EasyOpts *eopts) { + free(eopts->curlopt_capath); + free(eopts->curlopt_cacert); + free(eopts->curlopt_username); + free(eopts->curlopt_password); + free(eopts->curlopt_proxy); + free(eopts->curlopt_interface); + free(eopts->curlopt_unix_socket_path); + free(eopts->curlopt_doh_url); + free(eopts->curlopt_sslcert); + free(eopts->curlopt_sslkey); + free(eopts->curlopt_sslkey_blob); + free(eopts->curlopt_keypasswd); + free(eopts->curlopt_userpwd); +} + static void erl_input(struct bufferevent *ev, void *arg) { u_int32_t len; size_t data_read; char *buf; int index; - int index_prev; int version; int arity; int erl_type; int size; long sizel; - erlang_pid *pid; - erlang_ref *ref; long method; - char *url; GlobalInfo *global = (GlobalInfo *)arg; - struct evbuffer *input = bufferevent_get_input(from_erlang); - struct curl_slist *req_headers = NULL; - struct curl_slist *req_cookies = NULL; - char *header; - char *cookie; - int num_headers; - int num_cookies; - int i; - char *post_data; - int post_data_size; - EasyOpts eopts; - int num_eopts; - long eopt; - long eopt_long; - char* eopt_binary = NULL; + struct evbuffer *input = bufferevent_get_input(global->from_erlang); while (global->to_get > 0 || evbuffer_get_length(input) > sizeof(len)) { if (global->to_get > 0) { len = global->to_get; global->to_get = 0; } else { - if (bufferevent_read(from_erlang, &len, sizeof(len)) != sizeof(len)) { - errx(2, "Couldn't allocate len"); + if (bufferevent_read(global->from_erlang, &len, sizeof(len)) != sizeof(len)) { + errx(2, "Couldn't read packet length"); } len = ntohl(len); } @@ -1001,94 +1235,63 @@ static void erl_input(struct bufferevent *ev, void *arg) { break; } - buf = (char *)malloc(len); + /* Peek at first byte — valid ETF always starts with version tag 131 */ + { + unsigned char first_byte; + if (evbuffer_copyout(input, &first_byte, 1) != 1 || first_byte != 131) { + errx(2, "Invalid packet: not an Erlang term (first byte: %u)", first_byte); + } + } - data_read = bufferevent_read(from_erlang, buf, len); + buf = (char *)malloc(len); + data_read = bufferevent_read(global->from_erlang, buf, len); if (data_read != len) { errx(2, "Wanted to read %u bytes data but got %zu", len, data_read); } - index = 0; + /* Per-request state — initialized here so goto cleanup can free safely */ + erlang_pid *pid = NULL; + erlang_ref *ref = NULL; + int have_identity = 0; + char *url = NULL; + struct curl_slist *req_headers = NULL; + struct curl_slist *req_cookies = NULL; + char *post_data = NULL; + int post_data_size = 0; + EasyOpts eopts = {0}; + index = 0; pid = malloc(sizeof(*pid)); ref = malloc(sizeof(*ref)); + if (ei_decode_version(buf, &index, &version) || ei_decode_tuple_header(buf, &index, &arity) || ei_decode_pid(buf, &index, pid) || ei_decode_ref(buf, &index, ref) || ei_decode_long(buf, &index, &method) || ei_get_type(buf, &index, &erl_type, &size)) { - errx(2, "Couldn't read req"); + goto cleanup; } + have_identity = 1; url = (char *)malloc(size + 1); - if (ei_decode_binary(buf, &index, url, &sizel)) { - errx(2, "Couldn't read url"); + goto cleanup; } - url[size] = '\0'; - if (ei_decode_list_header(buf, &index, &num_headers)) { - errx(2, "Couldn't decode headers length"); - } - req_headers = NULL; - for (i = 0; i < num_headers; i++) { - if (ei_get_type(buf, &index, &erl_type, &size)) { - errx(2, "Couldn't read header size"); - } - header = (char *)malloc(size + 1); - if (ei_decode_binary(buf, &index, header, &sizel)) { - errx(2, "Couldn't read header"); - } - header[size] = '\0'; - req_headers = curl_slist_append(req_headers, header); - free(header); - } - - if (num_headers > 0 && ei_skip_term(buf, &index)) { - errx(2, "Couldn't skip empty list"); - } - - if (ei_decode_list_header(buf, &index, &num_cookies)) { - errx(2, "Couldn't decode cookies length"); - } - req_cookies = NULL; - for (i = 0; i < num_cookies; i++) { - if (ei_get_type(buf, &index, &erl_type, &size)) { - errx(2, "Couldn't read cookie size"); - } - cookie = (char *)malloc(size + 1); - if (ei_decode_binary(buf, &index, cookie, &sizel)) { - errx(2, "Couldn't read cookie"); - } - cookie[size] = '\0'; - req_cookies = curl_slist_append(req_cookies, cookie); - free(cookie); - } - - if (num_cookies > 0 && ei_skip_term(buf, &index)) { - errx(2, "Couldn't skip empty list"); + if (parse_slist(buf, &index, &req_headers) != 0) { + goto cleanup; } - if (ei_get_type(buf, &index, &erl_type, &size)) { - errx(2, "Couldn't read req body size"); - } - - index_prev = index; - - if (ei_decode_iodata(buf, &index, &post_data_size, NULL)) { - errx(2, "Couldn't read req body size"); + if (parse_slist(buf, &index, &req_cookies) != 0) { + goto cleanup; } - index = index_prev; - post_data = (char *)malloc(post_data_size); - - if (ei_decode_iodata(buf, &index, &post_data_size, post_data)) { - errx(2, "Couldn't read req body"); + if (parse_body(buf, &index, &post_data, &post_data_size) != 0) { + goto cleanup; } - eopts = (EasyOpts){ .curlopt_connecttimeout_ms = 30000, .curlopt_ssl_verifyhost = 2, @@ -1097,156 +1300,34 @@ static void erl_input(struct bufferevent *ev, void *arg) { .curlopt_maxredirs = 9, .curlopt_http_auth = -1, .curlopt_dns_cache_timeout = 60, - .curlopt_ca_cache_timeout = 86400, /* 24 hours - libcurl default */ - .curlopt_pipewait = 1, /* Enable by default for better HTTP/2 multiplexing */ + .curlopt_ca_cache_timeout = 86400, + .curlopt_pipewait = 1, }; - if (ei_decode_list_header(buf, &index, &num_eopts)) { - errx(2, "Couldn't decode eopts length"); - } - for (i = 0; i < num_eopts; i++) { - if (ei_decode_tuple_header(buf, &index, &arity) || - ei_decode_long(buf, &index, &eopt)) { - errx(2, "Couldn't read eopt tuple"); - } - - if (ei_get_type(buf, &index, &erl_type, &size)) { - errx(2, "Couldn't read eopt type"); - } - switch (erl_type) { - case ERL_SMALL_INTEGER_EXT: - case ERL_INTEGER_EXT: - if (ei_decode_long(buf, &index, &eopt_long)) { - errx(2, "Couldn't read eopt long value"); - } - switch (eopt) { - case K_CURLOPT_CONNECTTIMEOUT_MS: - eopts.curlopt_connecttimeout_ms = eopt_long; - break; - case K_CURLOPT_FOLLOWLOCATION: - eopts.curlopt_followlocation = eopt_long; - break; - case K_CURLOPT_SSL_VERIFYHOST: - eopts.curlopt_ssl_verifyhost = eopt_long; - break; - case K_CURLOPT_SSL_VERIFYPEER: - eopts.curlopt_ssl_verifypeer = eopt_long; - break; - case K_CURLOPT_TIMEOUT_MS: - eopts.curlopt_timeout_ms = eopt_long; - break; - case K_CURLOPT_MAXREDIRS: - eopts.curlopt_maxredirs = eopt_long; - break; - case K_CURLOPT_TCP_FASTOPEN: - eopts.curlopt_tcp_fastopen = eopt_long; - break; - case K_CURLOPT_HTTP_AUTH: - if (eopt_long == K_CURLAUTH_BASIC) { - eopts.curlopt_http_auth = CURLAUTH_BASIC; - } else if (eopt_long == K_CURLAUTH_DIGEST) { - eopts.curlopt_http_auth = CURLAUTH_DIGEST; - } else if (eopt_long == K_CURLAUTH_NTLM) { - eopts.curlopt_http_auth = CURLAUTH_NTLM; - } else if (eopt_long == K_CURLAUTH_NEGOTIATE) { - eopts.curlopt_http_auth = CURLAUTH_NEGOTIATE; - } else if (eopt_long != K_CURLAUTH_UNDEFINED) { - errx(2, "Unknown curlopt_http_auth value %ld", eopt_long); - } - break; - case K_CURLOPT_HTTP_VERSION: - eopts.curlopt_http_version = eopt_long; - break; - case K_CURLOPT_VERBOSE: - eopts.curlopt_verbose = eopt_long; - break; - case K_CURLOPT_SSLVERSION: - eopts.curlopt_sslversion = eopt_long; - break; - case K_CURLOPT_DNS_CACHE_TIMEOUT: - eopts.curlopt_dns_cache_timeout = eopt_long; - break; - case K_CURLOPT_CA_CACHE_TIMEOUT: - eopts.curlopt_ca_cache_timeout = eopt_long; - break; - case K_CURLOPT_PIPEWAIT: - eopts.curlopt_pipewait = eopt_long; - break; - default: - errx(2, "Unknown eopt long value %ld", eopt); - } - break; - case ERL_BINARY_EXT: - eopt_binary = (char *)malloc(size + 1); - if (ei_decode_binary(buf, &index, eopt_binary, &sizel)) { - errx(2, "Couldn't read eopt binary value"); - } - eopt_binary[size] = '\0'; - switch (eopt) { - case K_CURLOPT_CAPATH: - eopts.curlopt_capath = eopt_binary; - break; - case K_CURLOPT_CACERT: - eopts.curlopt_cacert = eopt_binary; - break; - case K_CURLOPT_USERNAME: - eopts.curlopt_username = eopt_binary; - break; - case K_CURLOPT_PASSWORD: - eopts.curlopt_password = eopt_binary; - break; - case K_CURLOPT_PROXY: - eopts.curlopt_proxy = eopt_binary; - break; - case K_CURLOPT_INTERFACE: - eopts.curlopt_interface = eopt_binary; - break; - case K_CURLOPT_UNIX_SOCKET_PATH: - eopts.curlopt_unix_socket_path = eopt_binary; - break; - case K_CURLOPT_DOH_URL: - eopts.curlopt_doh_url = eopt_binary; - break; - case K_CURLOPT_SSLCERT: - eopts.curlopt_sslcert = eopt_binary; - break; - case K_CURLOPT_SSLKEY: - eopts.curlopt_sslkey = eopt_binary; - break; - case K_CURLOPT_SSLKEY_BLOB: - eopts.curlopt_sslkey_blob = eopt_binary; - eopts.curlopt_sslkey_blob_size = size; - break; - case K_CURLOPT_KEYPASSWD: - eopts.curlopt_keypasswd = eopt_binary; - break; - case K_CURLOPT_USERPWD: - eopts.curlopt_userpwd = eopt_binary; - break; - default: - errx(2, "Unknown eopt binary value %ld", eopt); - } - break; - case ERL_ATOM_EXT: - // assuming this is 'undefined' == NULL - if (ei_skip_term(buf, &index)) { - errx(2, "Couldn't skip eopt atom value"); - } - break; - default: - errx(2, "Couldn't read eopt value '%c'", erl_type); - break; - } - } - - if (num_eopts > 0 && ei_skip_term(buf, &index)) { - errx(2, "Couldn't skip empty eopt list"); + if (parse_eopts(buf, &index, &eopts) != 0) { + goto cleanup; } new_conn(method, url, req_headers, req_cookies, post_data, post_data_size, eopts, pid, ref, arg); free(buf); + continue; + +cleanup: + if (have_identity) { + send_parse_error_to_erlang(global, pid, ref, "Couldn't read req"); + } else { + fprintf(stderr, "ERROR: Couldn't decode request identity (pid/ref)\n"); + } + free(pid); + free(ref); + free(url); + free(post_data); + curl_slist_free_all(req_headers); + curl_slist_free_all(req_cookies); + free_eopts(&eopts); + free(buf); } } @@ -1255,22 +1336,22 @@ static void erl_error(struct bufferevent *ev, short event, void *ud) { } static void erlang_init(GlobalInfo *global) { - from_erlang = + global->from_erlang = bufferevent_socket_new(global->evbase, STDIN_FILENO, 0); - if (from_erlang == NULL) { + if (global->from_erlang == NULL) { errx(2, "bufferevent_socket_new"); } - bufferevent_setcb(from_erlang, erl_input, NULL, erl_error, global); + bufferevent_setcb(global->from_erlang, erl_input, NULL, erl_error, global); - to_erlang = bufferevent_socket_new(global->evbase, STDOUT_FILENO, 0); - if (to_erlang == NULL) { + global->to_erlang = bufferevent_socket_new(global->evbase, STDOUT_FILENO, 0); + if (global->to_erlang == NULL) { errx(2, "bufferevent_socket_new"); } - bufferevent_setcb(to_erlang, NULL, NULL, erl_error, global); + bufferevent_setcb(global->to_erlang, NULL, NULL, erl_error, global); - bufferevent_setwatermark(from_erlang, EV_READ, 4, 0); - bufferevent_enable(from_erlang, EV_READ); - bufferevent_enable(to_erlang, EV_WRITE); + bufferevent_setwatermark(global->from_erlang, EV_READ, 4, 0); + bufferevent_enable(global->from_erlang, EV_READ); + bufferevent_enable(global->to_erlang, EV_WRITE); } int main(int argc, char **argv) { @@ -1282,7 +1363,6 @@ int main(int argc, char **argv) { struct option long_options[] = { { "pipelining", required_argument, 0, 'p' }, - { "max-pipeline-length", required_argument, 0, 'a' }, { "max-total-connections", required_argument, 0, 'c' }, { "max-concurrent-streams", required_argument, 0, 's' }, { 0, 0, 0, 0 } @@ -1339,30 +1419,25 @@ int main(int argc, char **argv) { curl_multi_setopt(global.multi, CURLMOPT_TIMERDATA, &global); while (1) { - c = getopt_long(argc, argv, "pa:c:s:", long_options, &option_index); + c = getopt_long(argc, argv, "p:c:s:", long_options, &option_index); if (c == -1) break; switch (c) { case 'p': - pipelining = atoi(optarg); + pipelining = strtol(optarg, NULL, 10); if (pipelining < 0 || pipelining > 2) { errx(2, "Bad pipelining arg '%ld'\n", pipelining); } - curl_multi_setopt(global.multi, CURLMOPT_PIPELINING, - atoi(optarg)); - break; - case 'a': - curl_multi_setopt(global.multi, CURLMOPT_MAX_PIPELINE_LENGTH, - atoi(optarg)); + curl_multi_setopt(global.multi, CURLMOPT_PIPELINING, pipelining); break; case 'c': curl_multi_setopt(global.multi, CURLMOPT_MAX_TOTAL_CONNECTIONS, - atoi(optarg)); + strtol(optarg, NULL, 10)); break; #if LIBCURL_VERSION_NUM >= 0x074300 /* 7.67.0 */ case 's': curl_multi_setopt(global.multi, CURLMOPT_MAX_CONCURRENT_STREAMS, - atoi(optarg)); + strtol(optarg, NULL, 10)); break; #endif default: @@ -1375,8 +1450,8 @@ int main(int argc, char **argv) { event_base_dispatch(global.evbase); /* Cleanup */ - bufferevent_free(from_erlang); - bufferevent_free(to_erlang); + bufferevent_free(global.from_erlang); + bufferevent_free(global.to_erlang); curl_multi_cleanup(global.multi); curl_share_cleanup(global.shobject); curl_global_cleanup(); diff --git a/src/katipo.erl b/src/katipo.erl index 6de8913..a1bf048 100644 --- a/src/katipo.erl +++ b/src/katipo.erl @@ -264,13 +264,8 @@ bad_opts. -type curlmopt() :: - %% curlmopt_chunk_length_penalty_size | - %% curlmopt_content_length_penalty_size | - %% curlmopt_max_host_connections | - max_pipeline_length | max_total_connections | max_concurrent_streams | - %% curlmopt_maxconnects | pipelining. %% curlmopt_pipelining_site_bl | %% curlmopt_pipelining_server_bl | @@ -406,8 +401,7 @@ sslversion_tlsv1_3. %% Minimum SSL/TLS version to use %% see [https://curl.se/libcurl/c/CURLOPT_SSLVERSION.html] --type curlmopts() :: [{max_pipeline_length, non_neg_integer()} | - {pipelining, pipelining()} | +-type curlmopts() :: [{pipelining, pipelining()} | {max_total_connections, non_neg_integer()} | {max_concurrent_streams, non_neg_integer()}]. @@ -844,9 +838,6 @@ get_mopts(Opts) -> end. -spec mopt_supported({curlmopt(), any()}) -> false | {true, any()}. -mopt_supported({max_pipeline_length, Val}) - when is_integer(Val) andalso Val >= 0 -> - {true, "--max-pipeline-length " ++ integer_to_list(Val)}; mopt_supported({pipelining, nothing}) -> {true, "--pipelining 0"}; mopt_supported({pipelining, http1}) -> diff --git a/test/katipo_SUITE.erl b/test/katipo_SUITE.erl index 114b79c..d6093da 100644 --- a/test/katipo_SUITE.erl +++ b/test/katipo_SUITE.erl @@ -176,10 +176,10 @@ groups() -> {pool, [], [pool_start_stop, worker_death, + port_garbage_input, port_death, port_late_response, pool_opts, - max_pipeline_length, max_concurrent_streams]}, {https, [parallel], [verify_host_verify_peer_ok, @@ -791,10 +791,12 @@ worker_death(Config) -> end, true = repeat_until_true(Fun3). -port_death(Config) -> +port_garbage_input(Config) -> + %% Sending garbage that bypasses {packet, 4} corrupts the pipe framing. + %% The port detects invalid ETF and exits. Supervisor recovers. Url = httpbin_url(Config, <<"/get">>), BaseOpts = ?config(httpbin_opts, Config), - PoolName = this_process_will_be_killed, + PoolName = port_garbage_test, PoolSize = 1, {ok, _} = katipo_pool:start(PoolName, PoolSize), WorkerName = wpool_pool:best_worker(PoolName), @@ -814,6 +816,31 @@ port_death(Config) -> true = repeat_until_true(Fun), ok = katipo_pool:stop(PoolName). +port_death(Config) -> + %% Killing the port OS process should be recovered by the supervisor + Url = httpbin_url(Config, <<"/get">>), + BaseOpts = ?config(httpbin_opts, Config), + PoolName = port_death_test, + PoolSize = 1, + {ok, _} = katipo_pool:start(PoolName, PoolSize), + WorkerName = wpool_pool:best_worker(PoolName), + WorkerPid = whereis(WorkerName), + {state, _, _, {state, Port, _}, _} = sys:get_state(WorkerPid), + {os_pid, OsPid} = erlang:port_info(Port, os_pid), + os:cmd("kill -9 " ++ integer_to_list(OsPid)), + Fun = fun() -> + WorkerName2 = wpool_pool:best_worker(PoolName), + WorkerPid2 = whereis(WorkerName2), + case sys:get_state(WorkerPid2) of + {state, _, _, {state, Port2, _}, _} when Port =/= Port2 -> + {ok, #{status := 200}} = + katipo:get(PoolName, Url, BaseOpts), + true + end + end, + true = repeat_until_true(Fun), + ok = katipo_pool:stop(PoolName). + port_late_response(Config) -> Url = httpbin_url(Config, <<"/delay/1">>), BaseOpts = ?config(httpbin_opts, Config), @@ -827,21 +854,11 @@ pool_opts(_) -> PoolName = pool_opts, PoolSize = 1, PoolOpts = [{pipelining, multiplex}, - {max_pipeline_length, 5}, {max_total_connections, 10}, {ignore_junk_opt, hithere}], {error, _} = katipo_pool:start(PoolName, PoolSize, PoolOpts), ok = katipo_pool:stop(PoolName). -max_pipeline_length(_) -> - PoolName = pool_opts, - PoolSize = 1, - PoolOpts = [{pipelining, multiplex}, - {max_pipeline_length, 5}, - {max_total_connections, 10}], - {ok, _} = katipo_pool:start(PoolName, PoolSize, PoolOpts), - ok = katipo_pool:stop(PoolName). - max_concurrent_streams(_) -> PoolName = pool_max_streams, PoolSize = 1,