diff --git a/README.md b/README.md
index dd144aa..b3336ba 100644
--- a/README.md
+++ b/README.md
@@ -123,7 +123,6 @@ katipo:Method(Pool :: atom(), URL :: binary(), ReqOptions :: map()).
| Option | Type | Default | Note |
|:------------------------|:------------------------------|:-------------|:-----------------------------------------------------------------------------------------------|
| `pipelining` | `nothing`
`http1`
`multiplex` | `nothing` | HTTP pipelining [CURLMOPT_PIPELINING](https://curl.se/libcurl/c/CURLMOPT_PIPELINING.html) |
-| `max_pipeline_length` | `non_neg_integer()` | 100 | |
| `max_total_connections` | `non_neg_integer()` | 0 (no limit) | [docs](https://curl.se/libcurl/c/CURLMOPT_MAX_TOTAL_CONNECTIONS.html) |
| `max_concurrent_streams`| `non_neg_integer()` | 100 | [docs](https://curl.se/libcurl/c/CURLMOPT_MAX_CONCURRENT_STREAMS.html) curl >= 7.67.0 |
diff --git a/c_src/katipo.c b/c_src/katipo.c
index b56410b..d35f2a0 100644
--- a/c_src/katipo.c
+++ b/c_src/katipo.c
@@ -58,9 +58,6 @@
#define K_CURLAUTH_NTLM 103
#define K_CURLAUTH_NEGOTIATE 104
-struct bufferevent *to_erlang;
-struct bufferevent *from_erlang;
-
typedef struct _GlobalInfo {
struct event_base *evbase;
struct event *timer_event;
@@ -69,6 +66,8 @@ typedef struct _GlobalInfo {
int still_running;
size_t to_get;
curl_version_info_data *ver;
+ struct bufferevent *to_erlang;
+ struct bufferevent *from_erlang;
} GlobalInfo;
typedef struct _ConnInfo {
@@ -78,6 +77,7 @@ typedef struct _ConnInfo {
erlang_ref *ref;
char *memory;
size_t size;
+ size_t capacity;
GlobalInfo *global;
char error[CURL_ERROR_SIZE];
size_t num_headers;
@@ -400,18 +400,18 @@ static void mcode_or_die(const char *where, CURLMcode code) {
}
}
-static void send_to_erlang(char *buffer, size_t buffer_len) {
+static void send_to_erlang(GlobalInfo *global, char *buffer, size_t buffer_len) {
u_int32_t erl_pkt_len;
char size_buf[4];
erl_pkt_len = htonl((uint32_t)buffer_len);
(void)memcpy(size_buf, &erl_pkt_len, sizeof(erl_pkt_len));
- if (bufferevent_write(to_erlang, size_buf, sizeof(size_buf)) < 0) {
+ if (bufferevent_write(global->to_erlang, size_buf, sizeof(size_buf)) < 0) {
errx(2, "bufferevent_write");
}
- if (bufferevent_write(to_erlang, buffer, buffer_len) < 0) {
+ if (bufferevent_write(global->to_erlang, buffer, buffer_len) < 0) {
errx(2, "bufferevent_write");
}
}
@@ -521,7 +521,7 @@ static void send_ok_to_erlang(ConnInfo *conn) {
encode_metrics(&result, conn);
- send_to_erlang(result.buff, result.index);
+ send_to_erlang(conn->global, result.buff, result.index);
ei_x_free(&result);
}
@@ -551,7 +551,7 @@ static void send_error_to_erlang(CURLcode curl_code, ConnInfo *conn) {
encode_metrics(&result, conn);
- send_to_erlang(result.buff, result.index);
+ send_to_erlang(conn->global, result.buff, result.index);
ei_x_free(&result);
}
@@ -690,13 +690,20 @@ static int multi_timer_cb(CURLM *multi, long timeout_ms, void *userp) {
static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data) {
size_t realsize = size * nmemb;
ConnInfo *conn = (ConnInfo *)data;
- char *new_memory;
- new_memory = (char *)realloc(conn->memory, conn->size + realsize);
- if (new_memory == NULL) {
- return 0;
+ size_t needed = conn->size + realsize;
+ if (needed > conn->capacity) {
+ size_t new_cap = conn->capacity ? conn->capacity : 4096;
+ while (new_cap < needed) {
+ new_cap *= 2;
+ }
+ char *new_memory = (char *)realloc(conn->memory, new_cap);
+ if (new_memory == NULL) {
+ return 0;
+ }
+ conn->memory = new_memory;
+ conn->capacity = new_cap;
}
- conn->memory = new_memory;
memcpy(&(conn->memory[conn->size]), ptr, realsize);
conn->size += realsize;
@@ -779,8 +786,9 @@ static void new_conn(long method, char *url, struct curl_slist *req_headers,
conn = calloc(1, sizeof(*conn));
conn->error[0] = '\0';
- conn->memory = (char *)malloc(1);
+ conn->memory = NULL;
conn->size = 0;
+ conn->capacity = 0;
conn->num_headers = 0;
conn->resp_headers = NULL;
@@ -952,46 +960,272 @@ static void new_conn(long method, char *url, struct curl_slist *req_headers,
mcode_or_die("new_conn: curl_multi_add_handle", rc);
}
+static void send_parse_error_to_erlang(GlobalInfo *global,
+ erlang_pid *pid, erlang_ref *ref,
+ const char *msg) {
+ ei_x_buff result;
+ size_t msg_len = strlen(msg);
+
+ if (ei_x_new_with_version(&result) ||
+ ei_x_encode_tuple_header(&result, 2) ||
+ ei_x_encode_atom(&result, "error") ||
+ ei_x_encode_tuple_header(&result, 2) ||
+ ei_x_encode_tuple_header(&result, 2) ||
+ ei_x_encode_pid(&result, pid) ||
+ ei_x_encode_ref(&result, ref) ||
+ ei_x_encode_tuple_header(&result, 3) ||
+ ei_x_encode_atom(&result, "bad_opts") ||
+ ei_x_encode_binary(&result, msg, msg_len) ||
+ ei_x_encode_empty_list(&result)) {
+ errx(2, "Failed to encode parse error");
+ }
+
+ send_to_erlang(global, result.buff, result.index);
+ ei_x_free(&result);
+}
+
+static int parse_slist(char *buf, int *index, struct curl_slist **out) {
+ int num_items;
+ int erl_type;
+ int size;
+ long sizel;
+
+ if (ei_decode_list_header(buf, index, &num_items)) {
+ return -1;
+ }
+ *out = NULL;
+ for (int i = 0; i < num_items; i++) {
+ if (ei_get_type(buf, index, &erl_type, &size)) {
+ return -1;
+ }
+ char *item = (char *)malloc(size + 1);
+ if (ei_decode_binary(buf, index, item, &sizel)) {
+ free(item);
+ return -1;
+ }
+ item[size] = '\0';
+ *out = curl_slist_append(*out, item);
+ free(item);
+ }
+ if (num_items > 0 && ei_skip_term(buf, index)) {
+ return -1;
+ }
+ return 0;
+}
+
+static int parse_body(char *buf, int *index, char **out_data, int *out_size) {
+ int erl_type;
+ int size;
+ int index_prev;
+
+ if (ei_get_type(buf, index, &erl_type, &size)) {
+ return -1;
+ }
+ index_prev = *index;
+ if (ei_decode_iodata(buf, index, out_size, NULL)) {
+ return -1;
+ }
+ *index = index_prev;
+ *out_data = (char *)malloc(*out_size);
+ if (ei_decode_iodata(buf, index, out_size, *out_data)) {
+ free(*out_data);
+ *out_data = NULL;
+ return -1;
+ }
+ return 0;
+}
+
+static int parse_eopts(char *buf, int *index, EasyOpts *eopts) {
+ int num_eopts;
+ int arity;
+ int erl_type;
+ int size;
+ long eopt;
+ long eopt_long;
+ long sizel;
+ char *eopt_binary;
+
+ if (ei_decode_list_header(buf, index, &num_eopts)) {
+ return -1;
+ }
+ for (int i = 0; i < num_eopts; i++) {
+ if (ei_decode_tuple_header(buf, index, &arity) ||
+ ei_decode_long(buf, index, &eopt)) {
+ return -1;
+ }
+
+ if (ei_get_type(buf, index, &erl_type, &size)) {
+ return -1;
+ }
+ switch (erl_type) {
+ case ERL_SMALL_INTEGER_EXT:
+ case ERL_INTEGER_EXT:
+ if (ei_decode_long(buf, index, &eopt_long)) {
+ return -1;
+ }
+ switch (eopt) {
+ case K_CURLOPT_CONNECTTIMEOUT_MS:
+ eopts->curlopt_connecttimeout_ms = eopt_long;
+ break;
+ case K_CURLOPT_FOLLOWLOCATION:
+ eopts->curlopt_followlocation = eopt_long;
+ break;
+ case K_CURLOPT_SSL_VERIFYHOST:
+ eopts->curlopt_ssl_verifyhost = eopt_long;
+ break;
+ case K_CURLOPT_SSL_VERIFYPEER:
+ eopts->curlopt_ssl_verifypeer = eopt_long;
+ break;
+ case K_CURLOPT_TIMEOUT_MS:
+ eopts->curlopt_timeout_ms = eopt_long;
+ break;
+ case K_CURLOPT_MAXREDIRS:
+ eopts->curlopt_maxredirs = eopt_long;
+ break;
+ case K_CURLOPT_TCP_FASTOPEN:
+ eopts->curlopt_tcp_fastopen = eopt_long;
+ break;
+ case K_CURLOPT_HTTP_AUTH:
+ if (eopt_long == K_CURLAUTH_BASIC) {
+ eopts->curlopt_http_auth = CURLAUTH_BASIC;
+ } else if (eopt_long == K_CURLAUTH_DIGEST) {
+ eopts->curlopt_http_auth = CURLAUTH_DIGEST;
+ } else if (eopt_long == K_CURLAUTH_NTLM) {
+ eopts->curlopt_http_auth = CURLAUTH_NTLM;
+ } else if (eopt_long == K_CURLAUTH_NEGOTIATE) {
+ eopts->curlopt_http_auth = CURLAUTH_NEGOTIATE;
+ } else if (eopt_long != K_CURLAUTH_UNDEFINED) {
+ return -1;
+ }
+ break;
+ case K_CURLOPT_HTTP_VERSION:
+ eopts->curlopt_http_version = eopt_long;
+ break;
+ case K_CURLOPT_VERBOSE:
+ eopts->curlopt_verbose = eopt_long;
+ break;
+ case K_CURLOPT_SSLVERSION:
+ eopts->curlopt_sslversion = eopt_long;
+ break;
+ case K_CURLOPT_DNS_CACHE_TIMEOUT:
+ eopts->curlopt_dns_cache_timeout = eopt_long;
+ break;
+ case K_CURLOPT_CA_CACHE_TIMEOUT:
+ eopts->curlopt_ca_cache_timeout = eopt_long;
+ break;
+ case K_CURLOPT_PIPEWAIT:
+ eopts->curlopt_pipewait = eopt_long;
+ break;
+ default:
+ break;
+ }
+ break;
+ case ERL_BINARY_EXT:
+ eopt_binary = (char *)malloc(size + 1);
+ if (ei_decode_binary(buf, index, eopt_binary, &sizel)) {
+ free(eopt_binary);
+ return -1;
+ }
+ eopt_binary[size] = '\0';
+ switch (eopt) {
+ case K_CURLOPT_CAPATH:
+ eopts->curlopt_capath = eopt_binary;
+ break;
+ case K_CURLOPT_CACERT:
+ eopts->curlopt_cacert = eopt_binary;
+ break;
+ case K_CURLOPT_USERNAME:
+ eopts->curlopt_username = eopt_binary;
+ break;
+ case K_CURLOPT_PASSWORD:
+ eopts->curlopt_password = eopt_binary;
+ break;
+ case K_CURLOPT_PROXY:
+ eopts->curlopt_proxy = eopt_binary;
+ break;
+ case K_CURLOPT_INTERFACE:
+ eopts->curlopt_interface = eopt_binary;
+ break;
+ case K_CURLOPT_UNIX_SOCKET_PATH:
+ eopts->curlopt_unix_socket_path = eopt_binary;
+ break;
+ case K_CURLOPT_DOH_URL:
+ eopts->curlopt_doh_url = eopt_binary;
+ break;
+ case K_CURLOPT_SSLCERT:
+ eopts->curlopt_sslcert = eopt_binary;
+ break;
+ case K_CURLOPT_SSLKEY:
+ eopts->curlopt_sslkey = eopt_binary;
+ break;
+ case K_CURLOPT_SSLKEY_BLOB:
+ eopts->curlopt_sslkey_blob = eopt_binary;
+ eopts->curlopt_sslkey_blob_size = size;
+ break;
+ case K_CURLOPT_KEYPASSWD:
+ eopts->curlopt_keypasswd = eopt_binary;
+ break;
+ case K_CURLOPT_USERPWD:
+ eopts->curlopt_userpwd = eopt_binary;
+ break;
+ default:
+ free(eopt_binary);
+ }
+ break;
+ case ERL_ATOM_EXT:
+ if (ei_skip_term(buf, index)) {
+ return -1;
+ }
+ break;
+ default:
+ return -1;
+ }
+ }
+
+ if (num_eopts > 0 && ei_skip_term(buf, index)) {
+ return -1;
+ }
+ return 0;
+}
+
+static void free_eopts(EasyOpts *eopts) {
+ free(eopts->curlopt_capath);
+ free(eopts->curlopt_cacert);
+ free(eopts->curlopt_username);
+ free(eopts->curlopt_password);
+ free(eopts->curlopt_proxy);
+ free(eopts->curlopt_interface);
+ free(eopts->curlopt_unix_socket_path);
+ free(eopts->curlopt_doh_url);
+ free(eopts->curlopt_sslcert);
+ free(eopts->curlopt_sslkey);
+ free(eopts->curlopt_sslkey_blob);
+ free(eopts->curlopt_keypasswd);
+ free(eopts->curlopt_userpwd);
+}
+
static void erl_input(struct bufferevent *ev, void *arg) {
u_int32_t len;
size_t data_read;
char *buf;
int index;
- int index_prev;
int version;
int arity;
int erl_type;
int size;
long sizel;
- erlang_pid *pid;
- erlang_ref *ref;
long method;
- char *url;
GlobalInfo *global = (GlobalInfo *)arg;
- struct evbuffer *input = bufferevent_get_input(from_erlang);
- struct curl_slist *req_headers = NULL;
- struct curl_slist *req_cookies = NULL;
- char *header;
- char *cookie;
- int num_headers;
- int num_cookies;
- int i;
- char *post_data;
- int post_data_size;
- EasyOpts eopts;
- int num_eopts;
- long eopt;
- long eopt_long;
- char* eopt_binary = NULL;
+ struct evbuffer *input = bufferevent_get_input(global->from_erlang);
while (global->to_get > 0 || evbuffer_get_length(input) > sizeof(len)) {
if (global->to_get > 0) {
len = global->to_get;
global->to_get = 0;
} else {
- if (bufferevent_read(from_erlang, &len, sizeof(len)) != sizeof(len)) {
- errx(2, "Couldn't allocate len");
+ if (bufferevent_read(global->from_erlang, &len, sizeof(len)) != sizeof(len)) {
+ errx(2, "Couldn't read packet length");
}
len = ntohl(len);
}
@@ -1001,94 +1235,63 @@ static void erl_input(struct bufferevent *ev, void *arg) {
break;
}
- buf = (char *)malloc(len);
+ /* Peek at first byte — valid ETF always starts with version tag 131 */
+ {
+ unsigned char first_byte;
+ if (evbuffer_copyout(input, &first_byte, 1) != 1 || first_byte != 131) {
+ errx(2, "Invalid packet: not an Erlang term (first byte: %u)", first_byte);
+ }
+ }
- data_read = bufferevent_read(from_erlang, buf, len);
+ buf = (char *)malloc(len);
+ data_read = bufferevent_read(global->from_erlang, buf, len);
if (data_read != len) {
errx(2, "Wanted to read %u bytes data but got %zu", len, data_read);
}
- index = 0;
+ /* Per-request state — initialized here so goto cleanup can free safely */
+ erlang_pid *pid = NULL;
+ erlang_ref *ref = NULL;
+ int have_identity = 0;
+ char *url = NULL;
+ struct curl_slist *req_headers = NULL;
+ struct curl_slist *req_cookies = NULL;
+ char *post_data = NULL;
+ int post_data_size = 0;
+ EasyOpts eopts = {0};
+ index = 0;
pid = malloc(sizeof(*pid));
ref = malloc(sizeof(*ref));
+
if (ei_decode_version(buf, &index, &version) ||
ei_decode_tuple_header(buf, &index, &arity) ||
ei_decode_pid(buf, &index, pid) ||
ei_decode_ref(buf, &index, ref) ||
ei_decode_long(buf, &index, &method) ||
ei_get_type(buf, &index, &erl_type, &size)) {
- errx(2, "Couldn't read req");
+ goto cleanup;
}
+ have_identity = 1;
url = (char *)malloc(size + 1);
-
if (ei_decode_binary(buf, &index, url, &sizel)) {
- errx(2, "Couldn't read url");
+ goto cleanup;
}
-
url[size] = '\0';
- if (ei_decode_list_header(buf, &index, &num_headers)) {
- errx(2, "Couldn't decode headers length");
- }
- req_headers = NULL;
- for (i = 0; i < num_headers; i++) {
- if (ei_get_type(buf, &index, &erl_type, &size)) {
- errx(2, "Couldn't read header size");
- }
- header = (char *)malloc(size + 1);
- if (ei_decode_binary(buf, &index, header, &sizel)) {
- errx(2, "Couldn't read header");
- }
- header[size] = '\0';
- req_headers = curl_slist_append(req_headers, header);
- free(header);
- }
-
- if (num_headers > 0 && ei_skip_term(buf, &index)) {
- errx(2, "Couldn't skip empty list");
- }
-
- if (ei_decode_list_header(buf, &index, &num_cookies)) {
- errx(2, "Couldn't decode cookies length");
- }
- req_cookies = NULL;
- for (i = 0; i < num_cookies; i++) {
- if (ei_get_type(buf, &index, &erl_type, &size)) {
- errx(2, "Couldn't read cookie size");
- }
- cookie = (char *)malloc(size + 1);
- if (ei_decode_binary(buf, &index, cookie, &sizel)) {
- errx(2, "Couldn't read cookie");
- }
- cookie[size] = '\0';
- req_cookies = curl_slist_append(req_cookies, cookie);
- free(cookie);
- }
-
- if (num_cookies > 0 && ei_skip_term(buf, &index)) {
- errx(2, "Couldn't skip empty list");
+ if (parse_slist(buf, &index, &req_headers) != 0) {
+ goto cleanup;
}
- if (ei_get_type(buf, &index, &erl_type, &size)) {
- errx(2, "Couldn't read req body size");
- }
-
- index_prev = index;
-
- if (ei_decode_iodata(buf, &index, &post_data_size, NULL)) {
- errx(2, "Couldn't read req body size");
+ if (parse_slist(buf, &index, &req_cookies) != 0) {
+ goto cleanup;
}
- index = index_prev;
- post_data = (char *)malloc(post_data_size);
-
- if (ei_decode_iodata(buf, &index, &post_data_size, post_data)) {
- errx(2, "Couldn't read req body");
+ if (parse_body(buf, &index, &post_data, &post_data_size) != 0) {
+ goto cleanup;
}
-
eopts = (EasyOpts){
.curlopt_connecttimeout_ms = 30000,
.curlopt_ssl_verifyhost = 2,
@@ -1097,156 +1300,34 @@ static void erl_input(struct bufferevent *ev, void *arg) {
.curlopt_maxredirs = 9,
.curlopt_http_auth = -1,
.curlopt_dns_cache_timeout = 60,
- .curlopt_ca_cache_timeout = 86400, /* 24 hours - libcurl default */
- .curlopt_pipewait = 1, /* Enable by default for better HTTP/2 multiplexing */
+ .curlopt_ca_cache_timeout = 86400,
+ .curlopt_pipewait = 1,
};
- if (ei_decode_list_header(buf, &index, &num_eopts)) {
- errx(2, "Couldn't decode eopts length");
- }
- for (i = 0; i < num_eopts; i++) {
- if (ei_decode_tuple_header(buf, &index, &arity) ||
- ei_decode_long(buf, &index, &eopt)) {
- errx(2, "Couldn't read eopt tuple");
- }
-
- if (ei_get_type(buf, &index, &erl_type, &size)) {
- errx(2, "Couldn't read eopt type");
- }
- switch (erl_type) {
- case ERL_SMALL_INTEGER_EXT:
- case ERL_INTEGER_EXT:
- if (ei_decode_long(buf, &index, &eopt_long)) {
- errx(2, "Couldn't read eopt long value");
- }
- switch (eopt) {
- case K_CURLOPT_CONNECTTIMEOUT_MS:
- eopts.curlopt_connecttimeout_ms = eopt_long;
- break;
- case K_CURLOPT_FOLLOWLOCATION:
- eopts.curlopt_followlocation = eopt_long;
- break;
- case K_CURLOPT_SSL_VERIFYHOST:
- eopts.curlopt_ssl_verifyhost = eopt_long;
- break;
- case K_CURLOPT_SSL_VERIFYPEER:
- eopts.curlopt_ssl_verifypeer = eopt_long;
- break;
- case K_CURLOPT_TIMEOUT_MS:
- eopts.curlopt_timeout_ms = eopt_long;
- break;
- case K_CURLOPT_MAXREDIRS:
- eopts.curlopt_maxredirs = eopt_long;
- break;
- case K_CURLOPT_TCP_FASTOPEN:
- eopts.curlopt_tcp_fastopen = eopt_long;
- break;
- case K_CURLOPT_HTTP_AUTH:
- if (eopt_long == K_CURLAUTH_BASIC) {
- eopts.curlopt_http_auth = CURLAUTH_BASIC;
- } else if (eopt_long == K_CURLAUTH_DIGEST) {
- eopts.curlopt_http_auth = CURLAUTH_DIGEST;
- } else if (eopt_long == K_CURLAUTH_NTLM) {
- eopts.curlopt_http_auth = CURLAUTH_NTLM;
- } else if (eopt_long == K_CURLAUTH_NEGOTIATE) {
- eopts.curlopt_http_auth = CURLAUTH_NEGOTIATE;
- } else if (eopt_long != K_CURLAUTH_UNDEFINED) {
- errx(2, "Unknown curlopt_http_auth value %ld", eopt_long);
- }
- break;
- case K_CURLOPT_HTTP_VERSION:
- eopts.curlopt_http_version = eopt_long;
- break;
- case K_CURLOPT_VERBOSE:
- eopts.curlopt_verbose = eopt_long;
- break;
- case K_CURLOPT_SSLVERSION:
- eopts.curlopt_sslversion = eopt_long;
- break;
- case K_CURLOPT_DNS_CACHE_TIMEOUT:
- eopts.curlopt_dns_cache_timeout = eopt_long;
- break;
- case K_CURLOPT_CA_CACHE_TIMEOUT:
- eopts.curlopt_ca_cache_timeout = eopt_long;
- break;
- case K_CURLOPT_PIPEWAIT:
- eopts.curlopt_pipewait = eopt_long;
- break;
- default:
- errx(2, "Unknown eopt long value %ld", eopt);
- }
- break;
- case ERL_BINARY_EXT:
- eopt_binary = (char *)malloc(size + 1);
- if (ei_decode_binary(buf, &index, eopt_binary, &sizel)) {
- errx(2, "Couldn't read eopt binary value");
- }
- eopt_binary[size] = '\0';
- switch (eopt) {
- case K_CURLOPT_CAPATH:
- eopts.curlopt_capath = eopt_binary;
- break;
- case K_CURLOPT_CACERT:
- eopts.curlopt_cacert = eopt_binary;
- break;
- case K_CURLOPT_USERNAME:
- eopts.curlopt_username = eopt_binary;
- break;
- case K_CURLOPT_PASSWORD:
- eopts.curlopt_password = eopt_binary;
- break;
- case K_CURLOPT_PROXY:
- eopts.curlopt_proxy = eopt_binary;
- break;
- case K_CURLOPT_INTERFACE:
- eopts.curlopt_interface = eopt_binary;
- break;
- case K_CURLOPT_UNIX_SOCKET_PATH:
- eopts.curlopt_unix_socket_path = eopt_binary;
- break;
- case K_CURLOPT_DOH_URL:
- eopts.curlopt_doh_url = eopt_binary;
- break;
- case K_CURLOPT_SSLCERT:
- eopts.curlopt_sslcert = eopt_binary;
- break;
- case K_CURLOPT_SSLKEY:
- eopts.curlopt_sslkey = eopt_binary;
- break;
- case K_CURLOPT_SSLKEY_BLOB:
- eopts.curlopt_sslkey_blob = eopt_binary;
- eopts.curlopt_sslkey_blob_size = size;
- break;
- case K_CURLOPT_KEYPASSWD:
- eopts.curlopt_keypasswd = eopt_binary;
- break;
- case K_CURLOPT_USERPWD:
- eopts.curlopt_userpwd = eopt_binary;
- break;
- default:
- errx(2, "Unknown eopt binary value %ld", eopt);
- }
- break;
- case ERL_ATOM_EXT:
- // assuming this is 'undefined' == NULL
- if (ei_skip_term(buf, &index)) {
- errx(2, "Couldn't skip eopt atom value");
- }
- break;
- default:
- errx(2, "Couldn't read eopt value '%c'", erl_type);
- break;
- }
- }
-
- if (num_eopts > 0 && ei_skip_term(buf, &index)) {
- errx(2, "Couldn't skip empty eopt list");
+ if (parse_eopts(buf, &index, &eopts) != 0) {
+ goto cleanup;
}
new_conn(method, url, req_headers, req_cookies, post_data,
post_data_size, eopts, pid, ref, arg);
free(buf);
+ continue;
+
+cleanup:
+ if (have_identity) {
+ send_parse_error_to_erlang(global, pid, ref, "Couldn't read req");
+ } else {
+ fprintf(stderr, "ERROR: Couldn't decode request identity (pid/ref)\n");
+ }
+ free(pid);
+ free(ref);
+ free(url);
+ free(post_data);
+ curl_slist_free_all(req_headers);
+ curl_slist_free_all(req_cookies);
+ free_eopts(&eopts);
+ free(buf);
}
}
@@ -1255,22 +1336,22 @@ static void erl_error(struct bufferevent *ev, short event, void *ud) {
}
static void erlang_init(GlobalInfo *global) {
- from_erlang =
+ global->from_erlang =
bufferevent_socket_new(global->evbase, STDIN_FILENO, 0);
- if (from_erlang == NULL) {
+ if (global->from_erlang == NULL) {
errx(2, "bufferevent_socket_new");
}
- bufferevent_setcb(from_erlang, erl_input, NULL, erl_error, global);
+ bufferevent_setcb(global->from_erlang, erl_input, NULL, erl_error, global);
- to_erlang = bufferevent_socket_new(global->evbase, STDOUT_FILENO, 0);
- if (to_erlang == NULL) {
+ global->to_erlang = bufferevent_socket_new(global->evbase, STDOUT_FILENO, 0);
+ if (global->to_erlang == NULL) {
errx(2, "bufferevent_socket_new");
}
- bufferevent_setcb(to_erlang, NULL, NULL, erl_error, global);
+ bufferevent_setcb(global->to_erlang, NULL, NULL, erl_error, global);
- bufferevent_setwatermark(from_erlang, EV_READ, 4, 0);
- bufferevent_enable(from_erlang, EV_READ);
- bufferevent_enable(to_erlang, EV_WRITE);
+ bufferevent_setwatermark(global->from_erlang, EV_READ, 4, 0);
+ bufferevent_enable(global->from_erlang, EV_READ);
+ bufferevent_enable(global->to_erlang, EV_WRITE);
}
int main(int argc, char **argv) {
@@ -1282,7 +1363,6 @@ int main(int argc, char **argv) {
struct option long_options[] = {
{ "pipelining", required_argument, 0, 'p' },
- { "max-pipeline-length", required_argument, 0, 'a' },
{ "max-total-connections", required_argument, 0, 'c' },
{ "max-concurrent-streams", required_argument, 0, 's' },
{ 0, 0, 0, 0 }
@@ -1339,30 +1419,25 @@ int main(int argc, char **argv) {
curl_multi_setopt(global.multi, CURLMOPT_TIMERDATA, &global);
while (1) {
- c = getopt_long(argc, argv, "pa:c:s:", long_options, &option_index);
+ c = getopt_long(argc, argv, "p:c:s:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 'p':
- pipelining = atoi(optarg);
+ pipelining = strtol(optarg, NULL, 10);
if (pipelining < 0 || pipelining > 2) {
errx(2, "Bad pipelining arg '%ld'\n", pipelining);
}
- curl_multi_setopt(global.multi, CURLMOPT_PIPELINING,
- atoi(optarg));
- break;
- case 'a':
- curl_multi_setopt(global.multi, CURLMOPT_MAX_PIPELINE_LENGTH,
- atoi(optarg));
+ curl_multi_setopt(global.multi, CURLMOPT_PIPELINING, pipelining);
break;
case 'c':
curl_multi_setopt(global.multi, CURLMOPT_MAX_TOTAL_CONNECTIONS,
- atoi(optarg));
+ strtol(optarg, NULL, 10));
break;
#if LIBCURL_VERSION_NUM >= 0x074300 /* 7.67.0 */
case 's':
curl_multi_setopt(global.multi, CURLMOPT_MAX_CONCURRENT_STREAMS,
- atoi(optarg));
+ strtol(optarg, NULL, 10));
break;
#endif
default:
@@ -1375,8 +1450,8 @@ int main(int argc, char **argv) {
event_base_dispatch(global.evbase);
/* Cleanup */
- bufferevent_free(from_erlang);
- bufferevent_free(to_erlang);
+ bufferevent_free(global.from_erlang);
+ bufferevent_free(global.to_erlang);
curl_multi_cleanup(global.multi);
curl_share_cleanup(global.shobject);
curl_global_cleanup();
diff --git a/src/katipo.erl b/src/katipo.erl
index 6de8913..a1bf048 100644
--- a/src/katipo.erl
+++ b/src/katipo.erl
@@ -264,13 +264,8 @@
bad_opts.
-type curlmopt() ::
- %% curlmopt_chunk_length_penalty_size |
- %% curlmopt_content_length_penalty_size |
- %% curlmopt_max_host_connections |
- max_pipeline_length |
max_total_connections |
max_concurrent_streams |
- %% curlmopt_maxconnects |
pipelining.
%% curlmopt_pipelining_site_bl |
%% curlmopt_pipelining_server_bl |
@@ -406,8 +401,7 @@
sslversion_tlsv1_3.
%% Minimum SSL/TLS version to use
%% see [https://curl.se/libcurl/c/CURLOPT_SSLVERSION.html]
--type curlmopts() :: [{max_pipeline_length, non_neg_integer()} |
- {pipelining, pipelining()} |
+-type curlmopts() :: [{pipelining, pipelining()} |
{max_total_connections, non_neg_integer()} |
{max_concurrent_streams, non_neg_integer()}].
@@ -844,9 +838,6 @@ get_mopts(Opts) ->
end.
-spec mopt_supported({curlmopt(), any()}) -> false | {true, any()}.
-mopt_supported({max_pipeline_length, Val})
- when is_integer(Val) andalso Val >= 0 ->
- {true, "--max-pipeline-length " ++ integer_to_list(Val)};
mopt_supported({pipelining, nothing}) ->
{true, "--pipelining 0"};
mopt_supported({pipelining, http1}) ->
diff --git a/test/katipo_SUITE.erl b/test/katipo_SUITE.erl
index 114b79c..d6093da 100644
--- a/test/katipo_SUITE.erl
+++ b/test/katipo_SUITE.erl
@@ -176,10 +176,10 @@ groups() ->
{pool, [],
[pool_start_stop,
worker_death,
+ port_garbage_input,
port_death,
port_late_response,
pool_opts,
- max_pipeline_length,
max_concurrent_streams]},
{https, [parallel],
[verify_host_verify_peer_ok,
@@ -791,10 +791,12 @@ worker_death(Config) ->
end,
true = repeat_until_true(Fun3).
-port_death(Config) ->
+port_garbage_input(Config) ->
+ %% Sending garbage that bypasses {packet, 4} corrupts the pipe framing.
+ %% The port detects invalid ETF and exits. Supervisor recovers.
Url = httpbin_url(Config, <<"/get">>),
BaseOpts = ?config(httpbin_opts, Config),
- PoolName = this_process_will_be_killed,
+ PoolName = port_garbage_test,
PoolSize = 1,
{ok, _} = katipo_pool:start(PoolName, PoolSize),
WorkerName = wpool_pool:best_worker(PoolName),
@@ -814,6 +816,31 @@ port_death(Config) ->
true = repeat_until_true(Fun),
ok = katipo_pool:stop(PoolName).
+port_death(Config) ->
+ %% Killing the port OS process should be recovered by the supervisor
+ Url = httpbin_url(Config, <<"/get">>),
+ BaseOpts = ?config(httpbin_opts, Config),
+ PoolName = port_death_test,
+ PoolSize = 1,
+ {ok, _} = katipo_pool:start(PoolName, PoolSize),
+ WorkerName = wpool_pool:best_worker(PoolName),
+ WorkerPid = whereis(WorkerName),
+ {state, _, _, {state, Port, _}, _} = sys:get_state(WorkerPid),
+ {os_pid, OsPid} = erlang:port_info(Port, os_pid),
+ os:cmd("kill -9 " ++ integer_to_list(OsPid)),
+ Fun = fun() ->
+ WorkerName2 = wpool_pool:best_worker(PoolName),
+ WorkerPid2 = whereis(WorkerName2),
+ case sys:get_state(WorkerPid2) of
+ {state, _, _, {state, Port2, _}, _} when Port =/= Port2 ->
+ {ok, #{status := 200}} =
+ katipo:get(PoolName, Url, BaseOpts),
+ true
+ end
+ end,
+ true = repeat_until_true(Fun),
+ ok = katipo_pool:stop(PoolName).
+
port_late_response(Config) ->
Url = httpbin_url(Config, <<"/delay/1">>),
BaseOpts = ?config(httpbin_opts, Config),
@@ -827,21 +854,11 @@ pool_opts(_) ->
PoolName = pool_opts,
PoolSize = 1,
PoolOpts = [{pipelining, multiplex},
- {max_pipeline_length, 5},
{max_total_connections, 10},
{ignore_junk_opt, hithere}],
{error, _} = katipo_pool:start(PoolName, PoolSize, PoolOpts),
ok = katipo_pool:stop(PoolName).
-max_pipeline_length(_) ->
- PoolName = pool_opts,
- PoolSize = 1,
- PoolOpts = [{pipelining, multiplex},
- {max_pipeline_length, 5},
- {max_total_connections, 10}],
- {ok, _} = katipo_pool:start(PoolName, PoolSize, PoolOpts),
- ok = katipo_pool:stop(PoolName).
-
max_concurrent_streams(_) ->
PoolName = pool_max_streams,
PoolSize = 1,