From 88bc3ff9403ee609ca51e75f6bea9e439d0a0857 Mon Sep 17 00:00:00 2001 From: steve-chavez Date: Tue, 4 Nov 2025 22:02:13 -0500 Subject: [PATCH 1/2] chore: test against pg 18 stable --- nix/xpg.nix | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nix/xpg.nix b/nix/xpg.nix index acfa7ea..65ccaae 100644 --- a/nix/xpg.nix +++ b/nix/xpg.nix @@ -3,8 +3,8 @@ let dep = fetchFromGitHub { owner = "steve-chavez"; repo = "xpg"; - rev = "v1.6.0"; - sha256 = "sha256-NsdAmsYIRH/DWIZp93AHGYdPiJOztUIUSYcPikeebvw="; + rev = "v1.8.0"; + sha256 = "sha256-ltS2bprvzrmaBjzMmIiSdJh5P3gBV/blzFpYazevv8g="; }; xpg = import dep; in From c6b5f9d001d926902e417287be93f0c41b94e99c Mon Sep 17 00:00:00 2001 From: steve-chavez Date: Tue, 4 Nov 2025 22:33:39 -0500 Subject: [PATCH 2/2] refactor: use clang-format style You can run this with: ```bash net-style ``` The style is also checked on CI. --- .clang-format | 44 ++++- .github/workflows/main.yml | 21 +++ shell.nix | 11 ++ src/core.c | 236 ++++++++++++------------- src/core.h | 31 ++-- src/errors.c | 5 + src/errors.h | 52 +++--- src/event.c | 118 +++++++------ src/event.h | 22 +-- src/pg_prelude.h | 42 ++--- src/util.c | 80 +++++---- src/worker.c | 343 +++++++++++++++++-------------------- 12 files changed, 529 insertions(+), 476 deletions(-) diff --git a/.clang-format b/.clang-format index 80d3293..b4afda8 100644 --- a/.clang-format +++ b/.clang-format @@ -1,2 +1,42 @@ -BasedOnStyle: LLVM -IndentWidth: 4 +AlignAfterOpenBracket: Align +AlignConsecutiveAssignments: true +AlignConsecutiveDeclarations: true +AlignOperands: true +AllowAllParametersOfDeclarationOnNextLine: false +AllowShortFunctionsOnASingleLine: Empty +AllowShortLoopsOnASingleLine: false +AlwaysBreakAfterDefinitionReturnType: None +AlwaysBreakAfterReturnType: None +AlwaysBreakBeforeMultilineStrings: false +BinPackArguments: true +BinPackParameters: true +ColumnLimit: 100 +IndentPPDirectives: AfterHash +MaxEmptyLinesToKeep: 1 +PointerAlignment: Right +SpaceBeforeAssignmentOperators: true +SpaceBeforeParens: ControlStatements +SpaceInEmptyParentheses: false +SpacesBeforeTrailingComments: 1 +BracedInitializerIndentWidth: 2 + +# includes +IncludeBlocks: Preserve +SortIncludes: true + +# indentation +IndentWidth: 2 +TabWidth: 2 +UseTab: Never + +# if +AllowShortIfStatementsOnASingleLine: true + +# case +IndentCaseLabels: false +AllowShortCaseLabelsOnASingleLine: true +AlignConsecutiveShortCaseStatements: + Enabled: true + AcrossEmptyLines: true + AcrossComments: true + AlignCaseColons: true diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index bf834e1..ac4f647 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -105,3 +105,24 @@ jobs: with: github-token: ${{ secrets.GITHUB_TOKEN }} files: ./build-${{ matrix.pg-version }}/coverage.info + + style: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Install Nix + uses: cachix/install-nix-action@v31.5.1 + with: + nix_path: nixpkgs=channel:nixos-unstable + + - name: Use Cachix Cache + uses: cachix/cachix-action@v16 + with: + name: nxpg + authToken: ${{ secrets.CACHIX_AUTH_TOKEN }} + + - name: Run style check + run: nix-shell --run "net-style-check" diff --git a/shell.nix b/shell.nix index bc8dfa0..7af487c 100644 --- a/shell.nix +++ b/shell.nix @@ -14,6 +14,15 @@ mkShell { psycopg2 sqlalchemy ]; + style = + writeShellScriptBin "net-style" '' + ${clang-tools}/bin/clang-format -i src/* + ''; + styleCheck = + writeShellScriptBin "net-style-check" '' + ${clang-tools}/bin/clang-format -i src/* + ${git}/bin/git diff-index --exit-code HEAD -- '*.c' + ''; in [ xpg.xpg @@ -21,6 +30,8 @@ mkShell { nginxCustom.nginxScript curlWithGnuTls loadtest + style + styleCheck ]; shellHook = '' export HISTFILE=.history diff --git a/src/core.c b/src/core.c index 77528ef..b34375e 100644 --- a/src/core.c +++ b/src/core.c @@ -1,59 +1,58 @@ -#include #include -#include #include +#include +#include #include "pg_prelude.h" + #include "curl_prelude.h" + #include "core.h" -#include "event.h" #include "errors.h" +#include "event.h" static SPIPlanPtr del_response_plan = NULL; static SPIPlanPtr del_return_queue_plan = NULL; static SPIPlanPtr ins_response_plan = NULL; -static size_t -body_cb(void *contents, size_t size, size_t nmemb, void *userp) -{ - CurlHandle *handle = (CurlHandle*) userp; - size_t realsize = size * nmemb; - appendBinaryStringInfo(handle->body, (const char*)contents, (int)realsize); +static size_t body_cb(void *contents, size_t size, size_t nmemb, void *userp) { + CurlHandle *handle = (CurlHandle *)userp; + size_t realsize = size * nmemb; + appendBinaryStringInfo(handle->body, (const char *)contents, (int)realsize); return realsize; } -static struct curl_slist *pg_text_array_to_slist(ArrayType *array, - struct curl_slist *headers) { - ArrayIterator iterator; - Datum value; - bool isnull; - char *hdr; - - iterator = array_create_iterator(array, 0, NULL); +static struct curl_slist *pg_text_array_to_slist(ArrayType *array, struct curl_slist *headers) { + ArrayIterator iterator; + Datum value; + bool isnull; + char *hdr; - while (array_iterate(iterator, &value, &isnull)) { - if (isnull) { - continue; - } + iterator = array_create_iterator(array, 0, NULL); - hdr = TextDatumGetCString(value); - EREPORT_CURL_SLIST_APPEND(headers, hdr); - pfree(hdr); + while (array_iterate(iterator, &value, &isnull)) { + if (isnull) { + continue; } - array_free_iterator(iterator); - return headers; + hdr = TextDatumGetCString(value); + EREPORT_CURL_SLIST_APPEND(headers, hdr); + pfree(hdr); + } + array_free_iterator(iterator); + + return headers; } -void init_curl_handle(CurlHandle *handle, RequestQueueRow row){ - handle->id = row.id; - handle->body = makeStringInfo(); +void init_curl_handle(CurlHandle *handle, RequestQueueRow row) { + handle->id = row.id; + handle->body = makeStringInfo(); handle->ez_handle = curl_easy_init(); handle->timeout_milliseconds = row.timeout_milliseconds; if (!row.headersBin.isnull) { - ArrayType *pgHeaders = DatumGetArrayTypeP(row.headersBin.value); + ArrayType *pgHeaders = DatumGetArrayTypeP(row.headersBin.value); struct curl_slist *request_headers = NULL; request_headers = pg_text_array_to_slist(pgHeaders, request_headers); @@ -69,7 +68,8 @@ void init_curl_handle(CurlHandle *handle, RequestQueueRow row){ handle->method = TextDatumGetCString(row.method); - if (strcasecmp(handle->method, "GET") != 0 && strcasecmp(handle->method, "POST") != 0 && strcasecmp(handle->method, "DELETE") != 0) { + if (strcasecmp(handle->method, "GET") != 0 && strcasecmp(handle->method, "POST") != 0 && + strcasecmp(handle->method, "DELETE") != 0) { ereport(ERROR, errmsg("Unsupported request method %s", handle->method)); } @@ -83,8 +83,7 @@ void init_curl_handle(CurlHandle *handle, RequestQueueRow row){ if (strcasecmp(handle->method, "POST") == 0) { if (handle->req_body) { EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POSTFIELDS, handle->req_body); - } - else { + } else { EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POST, 1L); EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_POSTFIELDSIZE, 0L); } @@ -102,11 +101,10 @@ void init_curl_handle(CurlHandle *handle, RequestQueueRow row){ EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_HEADER, 0L); EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_URL, handle->url); EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_HTTPHEADER, handle->request_headers); - EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_TIMEOUT_MS, (long) handle->timeout_milliseconds); + EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_TIMEOUT_MS, (long)handle->timeout_milliseconds); EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_PRIVATE, handle); - EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_FOLLOWLOCATION, (long) true); - if (log_min_messages <= DEBUG2) - EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_VERBOSE, 1L); + EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_FOLLOWLOCATION, (long)true); + if (log_min_messages <= DEBUG2) EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_VERBOSE, 1L); #if LIBCURL_VERSION_NUM >= 0x075500 /* libcurl 7.85.0 */ EREPORT_CURL_SETOPT(handle->ez_handle, CURLOPT_PROTOCOLS_STR, "http,https"); #else @@ -114,16 +112,16 @@ void init_curl_handle(CurlHandle *handle, RequestQueueRow row){ #endif } -void set_curl_mhandle(WorkerState *wstate){ +void set_curl_mhandle(WorkerState *wstate) { EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_SOCKETFUNCTION, multi_socket_cb); EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_SOCKETDATA, wstate); EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_TIMERFUNCTION, multi_timer_cb); EREPORT_CURL_MULTI_SETOPT(wstate->curl_mhandle, CURLMOPT_TIMERDATA, wstate); } -uint64 delete_expired_responses(char *ttl, int batch_size){ +uint64 delete_expired_responses(char *ttl, int batch_size) { if (del_response_plan == NULL) { - SPIPlanPtr tmp = SPI_prepare("\ + SPIPlanPtr tmp = SPI_prepare("\ WITH\ rows AS (\ SELECT ctid\ @@ -134,35 +132,34 @@ uint64 delete_expired_responses(char *ttl, int batch_size){ )\ DELETE FROM net._http_response r\ USING rows WHERE r.ctid = rows.ctid", - 2, - (Oid[]){INTERVALOID, INT4OID}); - if (tmp == NULL) - ereport(ERROR, errmsg("SPI_prepare failed: %s", SPI_result_code_string(SPI_result))); - - del_response_plan = SPI_saveplan(tmp); - if (del_response_plan == NULL) - ereport(ERROR, errmsg("SPI_saveplan failed")); + 2, (Oid[]){INTERVALOID, INT4OID}); + if (tmp == NULL) + ereport(ERROR, errmsg("SPI_prepare failed: %s", SPI_result_code_string(SPI_result))); + + del_response_plan = SPI_saveplan(tmp); + if (del_response_plan == NULL) ereport(ERROR, errmsg("SPI_saveplan failed")); } - int ret_code = SPI_execute_plan(del_response_plan, - (Datum[]){ - DirectFunctionCall3(interval_in, CStringGetDatum(ttl), ObjectIdGetDatum(InvalidOid), Int32GetDatum(-1)) - , Int32GetDatum(batch_size) - }, NULL, false, 0); + int ret_code = SPI_execute_plan( + del_response_plan, + (Datum[]){DirectFunctionCall3(interval_in, CStringGetDatum(ttl), ObjectIdGetDatum(InvalidOid), + Int32GetDatum(-1)), + Int32GetDatum(batch_size)}, + NULL, false, 0); uint64 affected_rows = SPI_processed; - if (ret_code != SPI_OK_DELETE) - { - ereport(ERROR, errmsg("Error expiring response table rows: %s", SPI_result_code_string(ret_code))); + if (ret_code != SPI_OK_DELETE) { + ereport(ERROR, + errmsg("Error expiring response table rows: %s", SPI_result_code_string(ret_code))); } return affected_rows; } -uint64 consume_request_queue(const int batch_size){ +uint64 consume_request_queue(const int batch_size) { if (del_return_queue_plan == NULL) { - SPIPlanPtr tmp = SPI_prepare("\ + SPIPlanPtr tmp = SPI_prepare("\ WITH\ rows AS (\ SELECT id\ @@ -173,29 +170,29 @@ uint64 consume_request_queue(const int batch_size){ DELETE FROM net.http_request_queue q\ USING rows WHERE q.id = rows.id\ RETURNING q.id, q.method, q.url, timeout_milliseconds, array(select key || ': ' || value from jsonb_each_text(q.headers)), q.body", - 1, - (Oid[]){INT4OID}); + 1, (Oid[]){INT4OID}); - if (tmp == NULL) - ereport(ERROR, errmsg("SPI_prepare failed: %s", SPI_result_code_string(SPI_result))); + if (tmp == NULL) + ereport(ERROR, errmsg("SPI_prepare failed: %s", SPI_result_code_string(SPI_result))); - del_return_queue_plan = SPI_saveplan(tmp); - if (del_return_queue_plan == NULL) - ereport(ERROR, errmsg("SPI_saveplan failed")); + del_return_queue_plan = SPI_saveplan(tmp); + if (del_return_queue_plan == NULL) ereport(ERROR, errmsg("SPI_saveplan failed")); } - int ret_code = SPI_execute_plan(del_return_queue_plan, (Datum[]){Int32GetDatum(batch_size)}, NULL, false, 0); + int ret_code = + SPI_execute_plan(del_return_queue_plan, (Datum[]){Int32GetDatum(batch_size)}, NULL, false, 0); if (ret_code != SPI_OK_DELETE_RETURNING) - ereport(ERROR, errmsg("Error getting http request queue: %s", SPI_result_code_string(ret_code))); + ereport(ERROR, + errmsg("Error getting http request queue: %s", SPI_result_code_string(ret_code))); return SPI_processed; } -// This has an implicit dependency on the execution of delete_return_request_queue, -// unfortunately we're not able to make this dependency explicit -// due to the design of SPI (which uses global variables) -RequestQueueRow get_request_queue_row(HeapTuple spi_tupval, TupleDesc spi_tupdesc){ +// This has an implicit dependency on the execution of +// delete_return_request_queue, unfortunately we're not able to make this +// dependency explicit due to the design of SPI (which uses global variables) +RequestQueueRow get_request_queue_row(HeapTuple spi_tupval, TupleDesc spi_tupdesc) { bool tupIsNull = false; int64 id = DatumGetInt64(SPI_getbinval(spi_tupval, spi_tupdesc, 1, &tupIsNull)); @@ -210,31 +207,27 @@ RequestQueueRow get_request_queue_row(HeapTuple spi_tupval, TupleDesc spi_tupdes int32 timeout_milliseconds = DatumGetInt32(SPI_getbinval(spi_tupval, spi_tupdesc, 4, &tupIsNull)); EREPORT_NULL_ATTR(tupIsNull, timeout_milliseconds); - NullableDatum headersBin = { - .value = SPI_getbinval(spi_tupval, spi_tupdesc, 5, &tupIsNull), - .isnull = tupIsNull - }; + NullableDatum headersBin = {.value = SPI_getbinval(spi_tupval, spi_tupdesc, 5, &tupIsNull), + .isnull = tupIsNull}; - NullableDatum bodyBin = { - .value = SPI_getbinval(spi_tupval, spi_tupdesc, 6, &tupIsNull), - .isnull = tupIsNull - }; + NullableDatum bodyBin = {.value = SPI_getbinval(spi_tupval, spi_tupdesc, 6, &tupIsNull), + .isnull = tupIsNull}; - return (RequestQueueRow){ - id, method, url, timeout_milliseconds, headersBin, bodyBin - }; + return (RequestQueueRow){id, method, url, timeout_milliseconds, headersBin, bodyBin}; } -static Jsonb *jsonb_headers_from_curl_handle(CURL *ez_handle){ +static Jsonb *jsonb_headers_from_curl_handle(CURL *ez_handle) { struct curl_header *header, *prev = NULL; JsonbParseState *headers = NULL; (void)pushJsonbValue(&headers, WJB_BEGIN_OBJECT, NULL); - while((header = curl_easy_nextheader(ez_handle, CURLH_HEADER, 0, prev))) { - JsonbValue key = {.type = jbvString, .val = {.string = {.val = header->name, .len = strlen(header->name)}}}; - JsonbValue value = {.type = jbvString, .val = {.string = {.val = header->value, .len = strlen(header->value)}}}; - (void)pushJsonbValue(&headers, WJB_KEY, &key); + while ((header = curl_easy_nextheader(ez_handle, CURLH_HEADER, 0, prev))) { + JsonbValue key = {.type = jbvString, + .val = {.string = {.val = header->name, .len = strlen(header->name)}}}; + JsonbValue value = {.type = jbvString, + .val = {.string = {.val = header->value, .len = strlen(header->value)}}}; + (void)pushJsonbValue(&headers, WJB_KEY, &key); (void)pushJsonbValue(&headers, WJB_VALUE, &value); prev = header; } @@ -244,91 +237,90 @@ static Jsonb *jsonb_headers_from_curl_handle(CURL *ez_handle){ return jsonb_headers; } -void insert_response(CurlHandle *handle, CURLcode curl_return_code){ +void insert_response(CurlHandle *handle, CURLcode curl_return_code) { enum { nparams = 7 }; // using an enum because const size_t nparams doesn't compile Datum vals[nparams]; - char nulls[nparams]; MemSet(nulls, 'n', nparams); + char nulls[nparams]; + MemSet(nulls, 'n', nparams); - vals[0] = Int64GetDatum(handle->id); + vals[0] = Int64GetDatum(handle->id); nulls[0] = ' '; if (curl_return_code == CURLE_OK) { - Jsonb *jsonb_headers = jsonb_headers_from_curl_handle(handle->ez_handle); - long res_http_status_code = 0; + Jsonb *jsonb_headers = jsonb_headers_from_curl_handle(handle->ez_handle); + long res_http_status_code = 0; EREPORT_CURL_GETINFO(handle->ez_handle, CURLINFO_RESPONSE_CODE, &res_http_status_code); - vals[1] = Int32GetDatum(res_http_status_code); + vals[1] = Int32GetDatum(res_http_status_code); nulls[1] = ' '; - if (handle->body && handle->body->data[0] != '\0'){ - vals[2] = CStringGetTextDatum(handle->body->data); + if (handle->body && handle->body->data[0] != '\0') { + vals[2] = CStringGetTextDatum(handle->body->data); nulls[2] = ' '; } - vals[3] = JsonbPGetDatum(jsonb_headers); + vals[3] = JsonbPGetDatum(jsonb_headers); nulls[3] = ' '; struct curl_header *hdr; - if (curl_easy_header(handle->ez_handle, "content-type", 0, CURLH_HEADER, -1, &hdr) == CURLHE_OK){ - vals[4] = CStringGetTextDatum(hdr->value); + if (curl_easy_header(handle->ez_handle, "content-type", 0, CURLH_HEADER, -1, &hdr) == + CURLHE_OK) { + vals[4] = CStringGetTextDatum(hdr->value); nulls[4] = ' '; } - vals[5] = BoolGetDatum(false); + vals[5] = BoolGetDatum(false); nulls[5] = ' '; } else { - bool timed_out = curl_return_code == CURLE_OPERATION_TIMEDOUT; + bool timed_out = curl_return_code == CURLE_OPERATION_TIMEDOUT; char *error_msg = NULL; - if (timed_out){ + if (timed_out) { error_msg = detailed_timeout_strerror(handle->ez_handle, handle->timeout_milliseconds).msg; } else { - error_msg = (char *) curl_easy_strerror(curl_return_code); + error_msg = (char *)curl_easy_strerror(curl_return_code); } - vals[5] = BoolGetDatum(timed_out); + vals[5] = BoolGetDatum(timed_out); nulls[5] = ' '; - if (error_msg){ - vals[6] = CStringGetTextDatum(error_msg); + if (error_msg) { + vals[6] = CStringGetTextDatum(error_msg); nulls[6] = ' '; } } if (ins_response_plan == NULL) { - SPIPlanPtr tmp = SPI_prepare("\ + SPIPlanPtr tmp = SPI_prepare( + "\ insert into net._http_response(id, status_code, content, headers, content_type, timed_out, error_msg) values ($1, $2, $3, $4, $5, $6, $7)", - nparams, - (Oid[nparams]){INT8OID, INT4OID, TEXTOID, JSONBOID, TEXTOID, BOOLOID, TEXTOID}); + nparams, (Oid[nparams]){INT8OID, INT4OID, TEXTOID, JSONBOID, TEXTOID, BOOLOID, TEXTOID}); - if (tmp == NULL) - ereport(ERROR, errmsg("SPI_prepare failed: %s", SPI_result_code_string(SPI_result))); + if (tmp == NULL) + ereport(ERROR, errmsg("SPI_prepare failed: %s", SPI_result_code_string(SPI_result))); - ins_response_plan = SPI_saveplan(tmp); - if (ins_response_plan == NULL) - ereport(ERROR, errmsg("SPI_saveplan failed")); + ins_response_plan = SPI_saveplan(tmp); + if (ins_response_plan == NULL) ereport(ERROR, errmsg("SPI_saveplan failed")); - SPI_freeplan(tmp); + SPI_freeplan(tmp); } int ret_code = SPI_execute_plan(ins_response_plan, vals, nulls, false, 0); - if (ret_code != SPI_OK_INSERT) - { + if (ret_code != SPI_OK_INSERT) { ereport(ERROR, errmsg("Error when inserting response: %s", SPI_result_code_string(ret_code))); } } -void pfree_handle(CurlHandle *handle){ +void pfree_handle(CurlHandle *handle) { pfree(handle->url); pfree(handle->method); - if(handle->req_body) - pfree(handle->req_body); + if (handle->req_body) pfree(handle->req_body); - if(handle->body) - destroyStringInfo(handle->body); + if (handle->body) destroyStringInfo(handle->body); - if(handle->request_headers) //curl_slist_free_all already handles the NULL case, but be explicit about it + if (handle->request_headers) // curl_slist_free_all already handles the NULL + // case, but be explicit about it curl_slist_free_all(handle->request_headers); } diff --git a/src/core.h b/src/core.h index a4ee370..dff2f67 100644 --- a/src/core.h +++ b/src/core.h @@ -12,32 +12,33 @@ typedef struct { pg_atomic_uint32 got_restart; pg_atomic_uint32 should_wake; pg_atomic_uint32 status; - Latch* shared_latch; + Latch *shared_latch; ConditionVariable cv; // required to publish the state of the worker to other backends int epfd; - CURLM *curl_mhandle; + CURLM *curl_mhandle; } WorkerState; // A row coming from the http_request_queue typedef struct { - int64 id; - Datum method; - Datum url; - int32 timeout_milliseconds; + int64 id; + Datum method; + Datum url; + int32 timeout_milliseconds; NullableDatum headersBin; NullableDatum bodyBin; } RequestQueueRow; -// The curl easy handle plus additional data, this acts for both the request and response cycle +// The curl easy handle plus additional data, this acts for both the request and +// response cycle typedef struct { - int64 id; - StringInfo body; - struct curl_slist* request_headers; - int32 timeout_milliseconds; - char *url; - char *req_body; - char *method; - CURL *ez_handle; + int64 id; + StringInfo body; + struct curl_slist *request_headers; + int32 timeout_milliseconds; + char *url; + char *req_body; + char *method; + CURL *ez_handle; } CurlHandle; uint64 delete_expired_responses(char *ttl, int batch_size); diff --git a/src/errors.c b/src/errors.c index 0a346bb..4ebc34a 100644 --- a/src/errors.c +++ b/src/errors.c @@ -1,7 +1,11 @@ #include "pg_prelude.h" + #include "curl_prelude.h" + #include "errors.h" +// disable clang-format as it only hurts readability here +// clang-format off /* * Show a more detailed error message when a timeout happens, which includes the DNS, TCP/SSL handshake and HTTP request/response time. An example message is like: * @@ -78,3 +82,4 @@ curl_timeout_msg detailed_timeout_strerror(CURL *ez_handle, int32 timeout_millis ); return result; } +// clang-format on diff --git a/src/errors.h b/src/errors.h index a14e8c6..16237fa 100644 --- a/src/errors.h +++ b/src/errors.h @@ -1,43 +1,41 @@ #ifndef ERRORS_H #define ERRORS_H -#define EREPORT_CURL_SETOPT(hdl, opt, prm) \ - do { \ - if (curl_easy_setopt(hdl, opt, prm) != CURLE_OK) \ - ereport(ERROR, errmsg("Could not curl_easy_setopt(%s)", #opt)); \ +#define EREPORT_CURL_SETOPT(hdl, opt, prm) \ + do { \ + if (curl_easy_setopt(hdl, opt, prm) != CURLE_OK) \ + ereport(ERROR, errmsg("Could not curl_easy_setopt(%s)", #opt)); \ } while (0) -#define EREPORT_CURL_GETINFO(hdl, opt, prm) \ - do { \ - if (curl_easy_getinfo(hdl, opt, prm) != CURLE_OK) \ - ereport(ERROR, errmsg("Could not curl_easy_getinfo(%s)", #opt)); \ +#define EREPORT_CURL_GETINFO(hdl, opt, prm) \ + do { \ + if (curl_easy_getinfo(hdl, opt, prm) != CURLE_OK) \ + ereport(ERROR, errmsg("Could not curl_easy_getinfo(%s)", #opt)); \ } while (0) -#define EREPORT_CURL_MULTI_SETOPT(hdl, opt, prm) \ - do { \ - if (curl_multi_setopt(hdl, opt, prm) != CURLM_OK) \ - ereport(ERROR, errmsg("Could not curl_multi_setopt(%s)", #opt)); \ +#define EREPORT_CURL_MULTI_SETOPT(hdl, opt, prm) \ + do { \ + if (curl_multi_setopt(hdl, opt, prm) != CURLM_OK) \ + ereport(ERROR, errmsg("Could not curl_multi_setopt(%s)", #opt)); \ } while (0) -#define EREPORT_CURL_SLIST_APPEND(list, str) \ - do { \ - struct curl_slist *new_list = curl_slist_append(list, str); \ - if (new_list == NULL) \ - ereport(ERROR, errmsg("curl_slist_append returned NULL")); \ - list = new_list; \ +#define EREPORT_CURL_SLIST_APPEND(list, str) \ + do { \ + struct curl_slist *new_list = curl_slist_append(list, str); \ + if (new_list == NULL) ereport(ERROR, errmsg("curl_slist_append returned NULL")); \ + list = new_list; \ } while (0) -#define EREPORT_NULL_ATTR(tupIsNull, attr) \ - do { \ - if (tupIsNull) \ - ereport(ERROR, errmsg("%s cannot be null", #attr)); \ +#define EREPORT_NULL_ATTR(tupIsNull, attr) \ + do { \ + if (tupIsNull) ereport(ERROR, errmsg("%s cannot be null", #attr)); \ } while (0) -#define EREPORT_MULTI(multi_call) \ - do { \ - CURLMcode code = multi_call; \ - if (code != CURLM_OK) \ - ereport(ERROR, errmsg("%s failed with %s", #multi_call, curl_multi_strerror(code))); \ +#define EREPORT_MULTI(multi_call) \ + do { \ + CURLMcode code = multi_call; \ + if (code != CURLM_OK) \ + ereport(ERROR, errmsg("%s failed with %s", #multi_call, curl_multi_strerror(code))); \ } while (0) #define CURL_TIMEOUT_MSG_SIZE 256 diff --git a/src/event.c b/src/event.c index 2663d15..3bb5df9 100644 --- a/src/event.c +++ b/src/event.c @@ -1,45 +1,49 @@ -#include #include +#include #include #include "pg_prelude.h" + #include "event.h" #ifdef WAIT_USE_EPOLL -static int timerfd = 0; +static int timerfd = 0; static bool timer_created = false; typedef struct epoll_event epoll_event; -typedef struct itimerspec itimerspec; +typedef struct itimerspec itimerspec; -inline int wait_event(int fd, event *events, size_t maxevents, int timeout_milliseconds){ +inline int wait_event(int fd, event *events, size_t maxevents, int timeout_milliseconds) { return epoll_wait(fd, events, maxevents, timeout_milliseconds); } -inline int event_monitor(){ +inline int event_monitor() { return epoll_create1(0); } -void ev_monitor_close(WorkerState *wstate){ +void ev_monitor_close(WorkerState *wstate) { close(wstate->epfd); close(timerfd); } -int multi_timer_cb(__attribute__ ((unused)) CURLM *multi, long timeout_ms, WorkerState *wstate) { +int multi_timer_cb(__attribute__((unused)) CURLM *multi, long timeout_ms, WorkerState *wstate) { elog(DEBUG2, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms); - if (!timer_created){ + if (!timer_created) { timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (timerfd < 0) { ereport(ERROR, errmsg("Failed to create timerfd")); } timerfd_settime(timerfd, 0, &(itimerspec){}, NULL); - epoll_ctl(wstate->epfd, EPOLL_CTL_ADD, timerfd, &(epoll_event){.events = EPOLLIN, .data.fd = timerfd}); + epoll_ctl(wstate->epfd, EPOLL_CTL_ADD, timerfd, + &(epoll_event){.events = EPOLLIN, .data.fd = timerfd}); timer_created = true; } + // disable clang-format as it only hurts readability here + // clang-format off itimerspec its = timeout_ms > 0 ? // assign the timeout normally @@ -57,6 +61,7 @@ int multi_timer_cb(__attribute__ ((unused)) CURLM *multi, long timeout_ms, Worke }: // libcurl passes a -1 to indicate the timer should be deleted (itimerspec){}; + // clang-format on int no_flags = 0; if (timerfd_settime(timerfd, no_flags, &its, NULL) < 0) { @@ -66,17 +71,19 @@ int multi_timer_cb(__attribute__ ((unused)) CURLM *multi, long timeout_ms, Worke return 0; } -int multi_socket_cb(__attribute__ ((unused)) CURL *easy, curl_socket_t sockfd, int what, WorkerState *wstate, void *socketp) { - static char *whatstrs[] = { "NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT", "CURL_POLL_REMOVE" }; +int multi_socket_cb(__attribute__((unused)) CURL *easy, curl_socket_t sockfd, int what, + WorkerState *wstate, void *socketp) { + static char *whatstrs[] = {"NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT", + "CURL_POLL_REMOVE"}; elog(DEBUG2, "multi_socket_cb: sockfd %d received %s", sockfd, whatstrs[what]); int epoll_op; - if(!socketp){ - epoll_op = EPOLL_CTL_ADD; + if (!socketp) { + epoll_op = EPOLL_CTL_ADD; bool socket_exists = true; curl_multi_assign(wstate->curl_mhandle, sockfd, &socket_exists); - } else if (what == CURL_POLL_REMOVE){ - epoll_op = EPOLL_CTL_DEL; + } else if (what == CURL_POLL_REMOVE) { + epoll_op = EPOLL_CTL_DEL; bool socket_exists = false; curl_multi_assign(wstate->curl_mhandle, sockfd, &socket_exists); } else { @@ -85,38 +92,36 @@ int multi_socket_cb(__attribute__ ((unused)) CURL *easy, curl_socket_t sockfd, i epoll_event ev = { .data.fd = sockfd, - .events = - (what & CURL_POLL_IN) ? - EPOLLIN: - (what & CURL_POLL_OUT) ? - EPOLLOUT: - 0, // no event is assigned since here we get CURL_POLL_REMOVE and the sockfd will be removed + .events = (what & CURL_POLL_IN) ? EPOLLIN + : (what & CURL_POLL_OUT) ? EPOLLOUT + : 0, // no event is assigned since here we get + // CURL_POLL_REMOVE and the sockfd will be removed }; // epoll_ctl will copy ev, so there's no need to do palloc for the epoll_event // https://github.com/torvalds/linux/blob/e32cde8d2bd7d251a8f9b434143977ddf13dcec6/fs/eventpoll.c#L2408-L2418 if (epoll_ctl(wstate->epfd, epoll_op, sockfd, &ev) < 0) { - int e = errno; - static char *opstrs[] = { "NONE", "EPOLL_CTL_ADD", "EPOLL_CTL_DEL", "EPOLL_CTL_MOD" }; - ereport(ERROR, errmsg("epoll_ctl with %s failed when receiving %s for sockfd %d: %s", whatstrs[what], opstrs[epoll_op], sockfd, strerror(e))); + int e = errno; + static char *opstrs[] = {"NONE", "EPOLL_CTL_ADD", "EPOLL_CTL_DEL", "EPOLL_CTL_MOD"}; + ereport(ERROR, errmsg("epoll_ctl with %s failed when receiving %s for sockfd %d: %s", + whatstrs[what], opstrs[epoll_op], sockfd, strerror(e))); } return 0; } -bool is_timer(event ev){ +bool is_timer(event ev) { return ev.data.fd == timerfd; } -int get_curl_event(event ev){ - int ev_bitmask = - ev.events & EPOLLIN ? CURL_CSELECT_IN: - ev.events & EPOLLOUT ? CURL_CSELECT_OUT: - CURL_CSELECT_ERR; +int get_curl_event(event ev) { + int ev_bitmask = ev.events & EPOLLIN ? CURL_CSELECT_IN + : ev.events & EPOLLOUT ? CURL_CSELECT_OUT + : CURL_CSELECT_ERR; return ev_bitmask; } -int get_socket_fd(event ev){ +int get_socket_fd(event ev) { return ev.data.fd; } @@ -124,29 +129,31 @@ int get_socket_fd(event ev){ typedef struct { curl_socket_t sockfd; - int action; -} SocketInfo ; + int action; +} SocketInfo; -int inline wait_event(int fd, event *events, size_t maxevents, int timeout_milliseconds){ - return kevent(fd, NULL, 0, events, maxevents, &(struct timespec){.tv_sec = timeout_milliseconds/1000}); +int inline wait_event(int fd, event *events, size_t maxevents, int timeout_milliseconds) { + return kevent(fd, NULL, 0, events, maxevents, + &(struct timespec){.tv_sec = timeout_milliseconds / 1000}); } -int inline event_monitor(){ +int inline event_monitor() { return kqueue(); } -void ev_monitor_close(WorkerState *wstate){ +void ev_monitor_close(WorkerState *wstate) { close(wstate->epfd); } -int multi_timer_cb(__attribute__ ((unused)) CURLM *multi, long timeout_ms, WorkerState *wstate) { +int multi_timer_cb(__attribute__((unused)) CURLM *multi, long timeout_ms, WorkerState *wstate) { elog(DEBUG2, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms); event timer_event; - int id = 1; + int id = 1; if (timeout_ms > 0) { - EV_SET(&timer_event, id, EVFILT_TIMER, EV_ADD, 0, timeout_ms, NULL); //0 means milliseconds (the default) - } else if (timeout_ms == 0){ + EV_SET(&timer_event, id, EVFILT_TIMER, EV_ADD, 0, timeout_ms, + NULL); // 0 means milliseconds (the default) + } else if (timeout_ms == 0) { /* libcurl wants us to timeout now, however setting both fields of * new_value.it_value to zero disarms the timer. The closest we can * do is to schedule the timer to fire in 1 ns. */ @@ -164,13 +171,15 @@ int multi_timer_cb(__attribute__ ((unused)) CURLM *multi, long timeout_ms, Worke return 0; } -int multi_socket_cb(__attribute__ ((unused)) CURL *easy, curl_socket_t sockfd, int what, WorkerState *wstate, void *socketp) { - static char *whatstrs[] = { "NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT", "CURL_POLL_REMOVE" }; +int multi_socket_cb(__attribute__((unused)) CURL *easy, curl_socket_t sockfd, int what, + WorkerState *wstate, void *socketp) { + static char *whatstrs[] = {"NONE", "CURL_POLL_IN", "CURL_POLL_OUT", "CURL_POLL_INOUT", + "CURL_POLL_REMOVE"}; elog(DEBUG2, "multi_socket_cb: sockfd %d received %s", sockfd, whatstrs[what]); - SocketInfo *sock_info = (SocketInfo *)socketp; + SocketInfo *sock_info = (SocketInfo *)socketp; struct kevent ev[2]; - int count = 0; + int count = 0; if (what == CURL_POLL_REMOVE) { if (sock_info->action & CURL_POLL_IN) @@ -183,34 +192,33 @@ int multi_socket_cb(__attribute__ ((unused)) CURL *easy, curl_socket_t sockfd, i pfree(sock_info); } else { if (!sock_info) { - sock_info = palloc(sizeof(SocketInfo)); + sock_info = palloc(sizeof(SocketInfo)); sock_info->sockfd = sockfd; sock_info->action = what; curl_multi_assign(wstate->curl_mhandle, sockfd, sock_info); } - if (what & CURL_POLL_IN) - EV_SET(&ev[count++], sockfd, EVFILT_READ, EV_ADD, 0, 0, sock_info); + if (what & CURL_POLL_IN) EV_SET(&ev[count++], sockfd, EVFILT_READ, EV_ADD, 0, 0, sock_info); - if (what & CURL_POLL_OUT) - EV_SET(&ev[count++], sockfd, EVFILT_WRITE, EV_ADD, 0, 0, sock_info); + if (what & CURL_POLL_OUT) EV_SET(&ev[count++], sockfd, EVFILT_WRITE, EV_ADD, 0, 0, sock_info); } Assert(count <= 2); if (kevent(wstate->epfd, &ev[0], count, NULL, 0, NULL) < 0) { int save_errno = errno; - ereport(ERROR, errmsg("kevent with %s failed for sockfd %d: %s", whatstrs[what], sockfd, strerror(save_errno))); + ereport(ERROR, errmsg("kevent with %s failed for sockfd %d: %s", whatstrs[what], sockfd, + strerror(save_errno))); } return 0; } -bool is_timer(event ev){ +bool is_timer(event ev) { return ev.filter == EVFILT_TIMER; } -int get_curl_event(event ev){ +int get_curl_event(event ev) { int ev_bitmask = 0; if (ev.filter == EVFILT_READ) ev_bitmask |= CURL_CSELECT_IN; @@ -222,8 +230,8 @@ int get_curl_event(event ev){ return ev_bitmask; } -int get_socket_fd(event ev){ - SocketInfo *sock_info = (SocketInfo *) ev.udata; +int get_socket_fd(event ev) { + SocketInfo *sock_info = (SocketInfo *)ev.udata; return sock_info->sockfd; } diff --git a/src/event.h b/src/event.h index a4d913c..96b2f5f 100644 --- a/src/event.h +++ b/src/event.h @@ -6,33 +6,33 @@ #include "core.h" #ifdef __linux__ -#define WAIT_USE_EPOLL +# define WAIT_USE_EPOLL #elif defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) -#define WAIT_USE_KQUEUE +# define WAIT_USE_KQUEUE #else -#error "no event loop implementation available" +# error "no event loop implementation available" #endif #ifdef WAIT_USE_EPOLL -#include -#include +# include +# include typedef struct epoll_event event; #else -#include +# include typedef struct kevent event; #endif -int wait_event(int fd, event *events, size_t maxevents, int wait_milliseconds); -int event_monitor(void); +int wait_event(int fd, event *events, size_t maxevents, int wait_milliseconds); +int event_monitor(void); void ev_monitor_close(WorkerState *wstate); -int multi_timer_cb(CURLM *multi, long timeout_ms, WorkerState *wstate); +int multi_timer_cb(CURLM *multi, long timeout_ms, WorkerState *wstate); int multi_socket_cb(CURL *easy, curl_socket_t sockfd, int what, WorkerState *wstate, void *socketp); bool is_timer(event ev); -int get_curl_event(event ev); -int get_socket_fd(event ev); +int get_curl_event(event ev); +int get_socket_fd(event ev); #endif diff --git a/src/pg_prelude.h b/src/pg_prelude.h index f065768..ec1a3de 100644 --- a/src/pg_prelude.h +++ b/src/pg_prelude.h @@ -7,28 +7,29 @@ #pragma GCC diagnostic ignored "-Wsign-compare" #include -#include -#include -#include -#include -#include + +#include "commands/dbcommands.h" #include "storage/lmgr.h" -#include -#include -#include #include +#include #include #include #include #include #include #include -#include "commands/dbcommands.h" #include #include #include #include #include +#include +#include +#include +#include +#include +#include +#include #include #include #include @@ -60,24 +61,23 @@ void destroyStringInfo(StringInfo str); #ifdef PG_PRELUDE_IMPL -const char *xact_event_name(XactEvent event){ +const char *xact_event_name(XactEvent event) { switch (event) { - case XACT_EVENT_COMMIT: return "XACT_EVENT_COMMIT"; - case XACT_EVENT_PARALLEL_COMMIT: return "XACT_EVENT_PARALLEL_COMMIT"; - case XACT_EVENT_ABORT: return "XACT_EVENT_ABORT"; - case XACT_EVENT_PARALLEL_ABORT: return "XACT_EVENT_PARALLEL_ABORT"; - case XACT_EVENT_PREPARE: return "XACT_EVENT_PREPARE"; - case XACT_EVENT_PRE_COMMIT: return "XACT_EVENT_PRE_COMMIT"; - case XACT_EVENT_PARALLEL_PRE_COMMIT: return "XACT_EVENT_PARALLEL_PRE_COMMIT"; - case XACT_EVENT_PRE_PREPARE: return "XACT_EVENT_PRE_PREPARE"; - default: return "(unknown XactEvent)"; + case XACT_EVENT_COMMIT : return "XACT_EVENT_COMMIT"; + case XACT_EVENT_PARALLEL_COMMIT : return "XACT_EVENT_PARALLEL_COMMIT"; + case XACT_EVENT_ABORT : return "XACT_EVENT_ABORT"; + case XACT_EVENT_PARALLEL_ABORT : return "XACT_EVENT_PARALLEL_ABORT"; + case XACT_EVENT_PREPARE : return "XACT_EVENT_PREPARE"; + case XACT_EVENT_PRE_COMMIT : return "XACT_EVENT_PRE_COMMIT"; + case XACT_EVENT_PARALLEL_PRE_COMMIT: return "XACT_EVENT_PARALLEL_PRE_COMMIT"; + case XACT_EVENT_PRE_PREPARE : return "XACT_EVENT_PRE_PREPARE"; + default : return "(unknown XactEvent)"; } } - #if PG17_LT // Polyfill for pg < 17 -// see https://github.com/postgres/postgres/blob/3c4e26a62c31ebe296e3aedb13ac51a7a35103bd/src/common/stringinfo.c#L402-L416 +// https://github.com/postgres/postgres/blob/3c4e26a62c31ebe296e3aedb13ac51a7a35103bd/src/common/stringinfo.c#L402-L416 void destroyStringInfo(StringInfo str) { Assert(str->maxlen != 0); pfree(str->data); diff --git a/src/util.c b/src/util.c index cc41703..13e8f1c 100644 --- a/src/util.c +++ b/src/util.c @@ -1,67 +1,65 @@ #include "pg_prelude.h" + #include "curl_prelude.h" + #include "util.h" PG_FUNCTION_INFO_V1(_urlencode_string); PG_FUNCTION_INFO_V1(_encode_url_with_params_array); Datum _urlencode_string(PG_FUNCTION_ARGS) { - if(PG_GETARG_POINTER(0) == NULL) - PG_RETURN_NULL(); + if (PG_GETARG_POINTER(0) == NULL) PG_RETURN_NULL(); - char *str = text_to_cstring(PG_GETARG_TEXT_P(0)); - char *urlencoded_str = NULL; + char *str = text_to_cstring(PG_GETARG_TEXT_P(0)); + char *urlencoded_str = NULL; - urlencoded_str = curl_escape(str, strlen(str)); + urlencoded_str = curl_escape(str, strlen(str)); - pfree(str); + pfree(str); - PG_RETURN_TEXT_P(cstring_to_text(urlencoded_str)); + PG_RETURN_TEXT_P(cstring_to_text(urlencoded_str)); } Datum _encode_url_with_params_array(PG_FUNCTION_ARGS) { - if(PG_GETARG_POINTER(0) == NULL || PG_GETARG_POINTER(1) == NULL) - PG_RETURN_NULL(); + if (PG_GETARG_POINTER(0) == NULL || PG_GETARG_POINTER(1) == NULL) PG_RETURN_NULL(); - char *url = text_to_cstring(PG_GETARG_TEXT_P(0)); - ArrayType *params = PG_GETARG_ARRAYTYPE_P(1); + char *url = text_to_cstring(PG_GETARG_TEXT_P(0)); + ArrayType *params = PG_GETARG_ARRAYTYPE_P(1); - char *full_url = NULL; + char *full_url = NULL; - ArrayIterator iterator; - Datum value; - bool isnull; - char *param; + ArrayIterator iterator; + Datum value; + bool isnull; + char *param; - CURLU *h = curl_url(); - CURLUcode rc = curl_url_set(h, CURLUPART_URL, url, 0); - if (rc != CURLUE_OK) { - // TODO: Use curl_url_strerror once released. - elog(ERROR, "%s", curl_easy_strerror((CURLcode)rc)); - } + CURLU *h = curl_url(); + CURLUcode rc = curl_url_set(h, CURLUPART_URL, url, 0); + if (rc != CURLUE_OK) { + // TODO: Use curl_url_strerror once released. + elog(ERROR, "%s", curl_easy_strerror((CURLcode)rc)); + } - iterator = array_create_iterator(params, 0, NULL); - while (array_iterate(iterator, &value, &isnull)) { - if (isnull) - continue; - - param = TextDatumGetCString(value); - rc = curl_url_set(h, CURLUPART_QUERY, param, CURLU_APPENDQUERY); - if (rc != CURLUE_OK) { - elog(ERROR, "curl_url returned: %d", rc); - } - pfree(param); - } - array_free_iterator(iterator); + iterator = array_create_iterator(params, 0, NULL); + while (array_iterate(iterator, &value, &isnull)) { + if (isnull) continue; - rc = curl_url_get(h, CURLUPART_URL, &full_url, 0); + param = TextDatumGetCString(value); + rc = curl_url_set(h, CURLUPART_QUERY, param, CURLU_APPENDQUERY); if (rc != CURLUE_OK) { - elog(ERROR, "curl_url returned: %d", rc); + elog(ERROR, "curl_url returned: %d", rc); } + pfree(param); + } + array_free_iterator(iterator); - pfree(url); - curl_url_cleanup(h); + rc = curl_url_get(h, CURLUPART_URL, &full_url, 0); + if (rc != CURLUE_OK) { + elog(ERROR, "curl_url returned: %d", rc); + } - PG_RETURN_TEXT_P(cstring_to_text(full_url)); -} + pfree(url); + curl_url_cleanup(h); + PG_RETURN_TEXT_P(cstring_to_text(full_url)); +} diff --git a/src/worker.c b/src/worker.c index 63d1194..fbfbfe5 100644 --- a/src/worker.c +++ b/src/worker.c @@ -1,19 +1,26 @@ -#include #include -#include #include +#include +#include #define PG_PRELUDE_IMPL #include "pg_prelude.h" + #include "curl_prelude.h" -#include "util.h" -#include "errors.h" + #include "core.h" +#include "errors.h" #include "event.h" +#include "util.h" -#define MIN_LIBCURL_VERSION_NUM 0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h -#define REQUIRED_LIBCURL_ERR_MSG "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this version" -_Static_assert(LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5). +#define MIN_LIBCURL_VERSION_NUM \ + 0x075300 // This is the 7.83.0 version in hex as defined in curl/curlver.h +#define REQUIRED_LIBCURL_ERR_MSG \ + "libcurl >= 7.83.0 is required, we use the curl_easy_nextheader() function added in this " \ + "version" +_Static_assert(LIBCURL_VERSION_NUM, + REQUIRED_LIBCURL_ERR_MSG); // test for older libcurl versions that don't even have + // LIBCURL_VERSION_NUM defined (e.g. libcurl 6.5). _Static_assert(LIBCURL_VERSION_NUM >= MIN_LIBCURL_VERSION_NUM, REQUIRED_LIBCURL_ERR_MSG); PG_MODULE_MAGIC; @@ -25,45 +32,46 @@ typedef enum { static WorkerState *worker_state = NULL; -static const int curl_handle_event_timeout_ms = 1000; -static const int net_worker_restart_time_sec = 1; -static const long no_timeout = -1L; -static bool wake_commit_cb_active = false; -static bool worker_should_restart = false; -static const size_t total_extension_tables = 2; +static const int curl_handle_event_timeout_ms = 1000; +static const int net_worker_restart_time_sec = 1; +static const long no_timeout = -1L; +static bool wake_commit_cb_active = false; +static bool worker_should_restart = false; +static const size_t total_extension_tables = 2; -static char* guc_ttl; -static int guc_batch_size; -static char* guc_database_name; -static char* guc_username; +static char *guc_ttl; +static int guc_batch_size; +static char *guc_database_name; +static char *guc_username; #if PG15_GTE -static shmem_request_hook_type prev_shmem_request_hook = NULL; +static shmem_request_hook_type prev_shmem_request_hook = NULL; #endif -static shmem_startup_hook_type prev_shmem_startup_hook = NULL; -static volatile sig_atomic_t got_sighup = false; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; +static volatile sig_atomic_t got_sighup = false; void _PG_init(void); #if PG_VERSION_NUM >= 180000 - PGDLLEXPORT pg_noreturn void pg_net_worker(Datum main_arg); +PGDLLEXPORT pg_noreturn void pg_net_worker(Datum main_arg); #else - PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn(); +PGDLLEXPORT void pg_net_worker(Datum main_arg) pg_attribute_noreturn(); #endif PG_FUNCTION_INFO_V1(worker_restart); -Datum worker_restart(__attribute__ ((unused)) PG_FUNCTION_ARGS) { - bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum) NULL)); // reload the config +Datum worker_restart(__attribute__((unused)) PG_FUNCTION_ARGS) { + bool result = DatumGetBool(DirectFunctionCall1(pg_reload_conf, (Datum)NULL)); // reload the config pg_atomic_write_u32(&worker_state->got_restart, 1); pg_write_barrier(); - if(worker_state->shared_latch) - SetLatch(worker_state->shared_latch); - PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain backward compatibility + if (worker_state->shared_latch) SetLatch(worker_state->shared_latch); + PG_RETURN_BOOL(result); // TODO is not necessary to return a bool here, but we do it to maintain + // backward compatibility } -static void wait_until_state(WorkerState *ws, WorkerStatus expected_status){ - if (pg_atomic_read_u32(&ws->status) == expected_status) // fast return without sleeping, in case condition is fulfilled +static void wait_until_state(WorkerState *ws, WorkerStatus expected_status) { + if (pg_atomic_read_u32(&ws->status) == + expected_status) // fast return without sleeping, in case condition is fulfilled return; ConditionVariablePrepareToSleep(&ws->cv); @@ -74,46 +82,47 @@ static void wait_until_state(WorkerState *ws, WorkerStatus expected_status){ } PG_FUNCTION_INFO_V1(wait_until_running); -Datum wait_until_running(__attribute__ ((unused)) PG_FUNCTION_ARGS){ +Datum wait_until_running(__attribute__((unused)) PG_FUNCTION_ARGS) { wait_until_state(worker_state, WS_RUNNING); PG_RETURN_VOID(); } // only wake at commit time to prevent excessive and unnecessary wakes. -// e.g only one wake when doing `select net.http_get('http://localhost:8080/pathological?status=200') from generate_series(1,100000);` -static void wake_at_commit(XactEvent event, __attribute__ ((unused)) void *arg){ +// e.g only one wake when doing `select +// net.http_get('http://localhost:8080/pathological?status=200') from generate_series(1,100000);` +static void wake_at_commit(XactEvent event, __attribute__((unused)) void *arg) { elog(DEBUG2, "pg_net xact callback received: %s", xact_event_name(event)); - switch(event){ - case XACT_EVENT_COMMIT: - case XACT_EVENT_PARALLEL_COMMIT: - if(wake_commit_cb_active){ - uint32 expected = 0; - bool success = pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 1); - pg_write_barrier(); + switch (event) { + case XACT_EVENT_COMMIT: + case XACT_EVENT_PARALLEL_COMMIT: + if (wake_commit_cb_active) { + uint32 expected = 0; + bool success = pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 1); + pg_write_barrier(); - if (success) // only wake the worker on first put, so if many concurrent wakes come we only wake once - SetLatch(worker_state->shared_latch); + if (success) // only wake the worker on first put, so if many concurrent wakes come we only + // wake once + SetLatch(worker_state->shared_latch); - wake_commit_cb_active = false; - } - break; - // TODO: `PREPARE TRANSACTION 'xx';` and `COMMIT PREPARED TRANSACTION 'xx';` do not wake the worker automatically, they require a manual `net.wake()` - // These are disabled by default and rarely used, see `max_prepared_transactions` https://www.postgresql.org/docs/17/runtime-config-resource.html#GUC-MAX-PREPARED-TRANSACTIONS - case XACT_EVENT_PREPARE: - // abort the callback on rollback - case XACT_EVENT_ABORT: - case XACT_EVENT_PARALLEL_ABORT: wake_commit_cb_active = false; - break; - default: - break; + } + break; + // TODO: `PREPARE TRANSACTION 'xx';` and `COMMIT PREPARED TRANSACTION 'xx';` do not wake the + // worker automatically, they require a manual `net.wake()` These are disabled by default and + // rarely used, see `max_prepared_transactions` + // https://www.postgresql.org/docs/17/runtime-config-resource.html#GUC-MAX-PREPARED-TRANSACTIONS + case XACT_EVENT_PREPARE: + // abort the callback on rollback + case XACT_EVENT_ABORT: + case XACT_EVENT_PARALLEL_ABORT: wake_commit_cb_active = false; break; + default : break; } } PG_FUNCTION_INFO_V1(wake); -Datum wake(__attribute__ ((unused)) PG_FUNCTION_ARGS) { +Datum wake(__attribute__((unused)) PG_FUNCTION_ARGS) { if (!wake_commit_cb_active) { // register only one callback per transaction RegisterXactCallback(wake_at_commit, NULL); wake_commit_cb_active = true; @@ -122,24 +131,18 @@ Datum wake(__attribute__ ((unused)) PG_FUNCTION_ARGS) { PG_RETURN_VOID(); } -static void -handle_sigterm(__attribute__ ((unused)) SIGNAL_ARGS) -{ +static void handle_sigterm(__attribute__((unused)) SIGNAL_ARGS) { int save_errno = errno; pg_atomic_write_u32(&worker_state->got_restart, 1); pg_write_barrier(); - if(worker_state->shared_latch) - SetLatch(worker_state->shared_latch); + if (worker_state->shared_latch) SetLatch(worker_state->shared_latch); errno = save_errno; } -static void -handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS) -{ - int save_errno = errno; - got_sighup = true; - if(worker_state->shared_latch) - SetLatch(worker_state->shared_latch); +static void handle_sighup(__attribute__((unused)) SIGNAL_ARGS) { + int save_errno = errno; + got_sighup = true; + if (worker_state->shared_latch) SetLatch(worker_state->shared_latch); errno = save_errno; } @@ -149,12 +152,9 @@ handle_sighup(__attribute__ ((unused)) SIGNAL_ARGS) *DROP DATATABASE from finishing since our worker would be sleeping and not reach *CHECK_FOR_INTERRUPTS() */ -static void -handle_sigusr1(SIGNAL_ARGS) -{ - int save_errno = errno; - if(worker_state->shared_latch) - SetLatch(worker_state->shared_latch); +static void handle_sigusr1(SIGNAL_ARGS) { + int save_errno = errno; + if (worker_state->shared_latch) SetLatch(worker_state->shared_latch); errno = save_errno; procsignal_sigusr1_handler(postgres_signal_arg); } @@ -165,10 +165,10 @@ static void publish_state(WorkerStatus s) { ConditionVariableBroadcast(&worker_state->cv); } -static void -net_on_exit(__attribute__ ((unused)) int code, __attribute__ ((unused)) Datum arg){ +static void net_on_exit(__attribute__((unused)) int code, __attribute__((unused)) Datum arg) { worker_should_restart = false; - pg_atomic_write_u32(&worker_state->should_wake, 1); // ensure the remaining work will continue since we'll restart + pg_atomic_write_u32(&worker_state->should_wake, + 1); // ensure the remaining work will continue since we'll restart worker_state->shared_latch = NULL; @@ -179,19 +179,18 @@ net_on_exit(__attribute__ ((unused)) int code, __attribute__ ((unused)) Datum ar } // wait according to the wait type while ensuring interrupts are processed while waiting -static void wait_while_processing_interrupts(WorkerWait ww, bool *should_restart){ - switch(ww){ - case WORKER_WAIT_NO_TIMEOUT: - WaitLatch(worker_state->shared_latch, - WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, - no_timeout, - PG_WAIT_EXTENSION); - ResetLatch(worker_state->shared_latch); - break; - case WORKER_WAIT_ONE_SECOND: - WaitLatch(worker_state->shared_latch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000, PG_WAIT_EXTENSION); - ResetLatch(worker_state->shared_latch); - break; +static void wait_while_processing_interrupts(WorkerWait ww, bool *should_restart) { + switch (ww) { + case WORKER_WAIT_NO_TIMEOUT: + WaitLatch(worker_state->shared_latch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, no_timeout, + PG_WAIT_EXTENSION); + ResetLatch(worker_state->shared_latch); + break; + case WORKER_WAIT_ONE_SECOND: + WaitLatch(worker_state->shared_latch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000, + PG_WAIT_EXTENSION); + ResetLatch(worker_state->shared_latch); + break; } CHECK_FOR_INTERRUPTS(); @@ -201,22 +200,23 @@ static void wait_while_processing_interrupts(WorkerWait ww, bool *should_restart ProcessConfigFile(PGC_SIGHUP); } - if (pg_atomic_exchange_u32(&worker_state->got_restart, 0)){ + if (pg_atomic_exchange_u32(&worker_state->got_restart, 0)) { *should_restart = true; } } -static bool is_extension_locked(Oid ext_table_oids[static total_extension_tables]){ +static bool is_extension_locked(Oid ext_table_oids[static total_extension_tables]) { Oid net_oid = get_namespace_oid("net", true); - if(!OidIsValid(net_oid)){ + if (!OidIsValid(net_oid)) { return false; } Oid queue_oid = get_relname_relid("http_request_queue", net_oid); - Oid resp_oid = get_relname_relid("_http_response", net_oid); + Oid resp_oid = get_relname_relid("_http_response", net_oid); - bool is_locked = ConditionalLockRelationOid(queue_oid, AccessShareLock) && ConditionalLockRelationOid(resp_oid, AccessShareLock); + bool is_locked = ConditionalLockRelationOid(queue_oid, AccessShareLock) && + ConditionalLockRelationOid(resp_oid, AccessShareLock); if (is_locked) { ext_table_oids[0] = queue_oid; @@ -226,12 +226,12 @@ static bool is_extension_locked(Oid ext_table_oids[static total_extension_tables return is_locked; } -static void unlock_extension(Oid ext_table_oids[static total_extension_tables]){ +static void unlock_extension(Oid ext_table_oids[static total_extension_tables]) { UnlockRelationOid(ext_table_oids[0], AccessShareLock); UnlockRelationOid(ext_table_oids[1], AccessShareLock); } -void pg_net_worker(__attribute__ ((unused)) Datum main_arg) { +void pg_net_worker(__attribute__((unused)) Datum main_arg) { worker_state->shared_latch = &MyProc->procLatch; on_proc_exit(net_on_exit, 0); @@ -243,10 +243,13 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) { BackgroundWorkerInitializeConnection(guc_database_name, guc_username, 0); pgstat_report_appname("pg_net " EXTVERSION); // set appname for pg_stat_activity - elog(INFO, "pg_net worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, pg_net.username=%s, pg_net.database_name=%s", guc_ttl, guc_batch_size, guc_username, guc_database_name); + elog(INFO, + "pg_net worker started with a config of: pg_net.ttl=%s, pg_net.batch_size=%d, " + "pg_net.username=%s, pg_net.database_name=%s", + guc_ttl, guc_batch_size, guc_username, guc_database_name); int curl_ret = curl_global_init(CURL_GLOBAL_ALL); - if(curl_ret != CURLE_OK) + if (curl_ret != CURLE_OK) ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(curl_ret))); worker_state->epfd = event_monitor(); @@ -256,8 +259,7 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) { } worker_state->curl_mhandle = curl_multi_init(); - if(!worker_state->curl_mhandle) - ereport(ERROR, errmsg("curl_multi_init()")); + if (!worker_state->curl_mhandle) ereport(ERROR, errmsg("curl_multi_init()")); set_curl_mhandle(worker_state); @@ -266,7 +268,7 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) { do { uint32 expected = 1; - if (!pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 0)){ + if (!pg_atomic_compare_exchange_u32(&worker_state->should_wake, &expected, 0)) { elog(DEBUG1, "pg_net worker waiting for wake"); wait_while_processing_interrupts(WORKER_WAIT_NO_TIMEOUT, &worker_should_restart); continue; @@ -282,7 +284,7 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) { Oid ext_table_oids[total_extension_tables]; - if(!is_extension_locked(ext_table_oids)){ + if (!is_extension_locked(ext_table_oids)) { elog(DEBUG1, "pg_net extension not loaded"); PopActiveSnapshot(); AbortCurrentTransaction(); @@ -293,38 +295,39 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) { expired_responses = delete_expired_responses(guc_ttl, guc_batch_size); - elog(DEBUG1, "Deleted "UINT64_FORMAT" expired rows", expired_responses); + elog(DEBUG1, "Deleted " UINT64_FORMAT " expired rows", expired_responses); requests_consumed = consume_request_queue(guc_batch_size); - elog(DEBUG1, "Consumed "UINT64_FORMAT" request rows", requests_consumed); + elog(DEBUG1, "Consumed " UINT64_FORMAT " request rows", requests_consumed); - if(requests_consumed > 0){ + if (requests_consumed > 0) { CurlHandle *handles = palloc(mul_size(sizeof(CurlHandle), requests_consumed)); // initialize curl handles for (size_t j = 0; j < requests_consumed; j++) { - init_curl_handle(&handles[j], get_request_queue_row(SPI_tuptable->vals[j], SPI_tuptable->tupdesc)); + init_curl_handle(&handles[j], + get_request_queue_row(SPI_tuptable->vals[j], SPI_tuptable->tupdesc)); - EREPORT_MULTI( - curl_multi_add_handle(worker_state->curl_mhandle, handles[j].ez_handle) - ); + EREPORT_MULTI(curl_multi_add_handle(worker_state->curl_mhandle, handles[j].ez_handle)); } // start curl event loop - int running_handles = 0; - int maxevents = requests_consumed + 1; // 1 extra for the timer + int running_handles = 0; + int maxevents = requests_consumed + 1; // 1 extra for the timer event events[maxevents]; do { - int nfds = wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms); + int nfds = + wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms); + if (nfds < 0) { int save_errno = errno; - if(save_errno == EINTR) { // can happen when the wait is interrupted, for example when running under GDB. Just continue in this case. + if (save_errno == EINTR) { // can happen when the wait is interrupted, for example when + // running under GDB. Just continue in this case. elog(DEBUG1, "wait_event() got %s, continuing", strerror(save_errno)); continue; - } - else { + } else { ereport(ERROR, errmsg("wait_event() failed: %s", strerror(save_errno))); break; } @@ -332,28 +335,24 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) { for (int i = 0; i < nfds; i++) { if (is_timer(events[i])) { - EREPORT_MULTI( - curl_multi_socket_action(worker_state->curl_mhandle, CURL_SOCKET_TIMEOUT, 0, &running_handles) - ); + EREPORT_MULTI(curl_multi_socket_action(worker_state->curl_mhandle, + CURL_SOCKET_TIMEOUT, 0, &running_handles)); } else { int curl_event = get_curl_event(events[i]); - int sockfd = get_socket_fd(events[i]); - - EREPORT_MULTI( - curl_multi_socket_action( - worker_state->curl_mhandle, - sockfd, - curl_event, - &running_handles) - ); + int sockfd = get_socket_fd(events[i]); + + EREPORT_MULTI(curl_multi_socket_action(worker_state->curl_mhandle, sockfd, curl_event, + &running_handles)); } } // insert finished responses - CURLMsg *msg = NULL; int msgs_left=0; + CURLMsg *msg = NULL; + int msgs_left = 0; while ((msg = curl_multi_info_read(worker_state->curl_mhandle, &msgs_left))) { if (msg->msg == CURLMSG_DONE) { - CurlHandle *handle = NULL; EREPORT_CURL_GETINFO(msg->easy_handle, CURLINFO_PRIVATE, &handle); + CurlHandle *handle = NULL; + EREPORT_CURL_GETINFO(msg->easy_handle, CURLINFO_PRIVATE, &handle); insert_response(handle, msg->data.result); } else { ereport(ERROR, errmsg("curl_multi_info_read(), CURLMsg=%d\n", msg->msg)); @@ -361,13 +360,13 @@ void pg_net_worker(__attribute__ ((unused)) Datum main_arg) { } elog(DEBUG1, "Pending curl running_handles: %d", running_handles); - } while (running_handles > 0); // run while there are curl handles, some won't finish in a single iteration since they could be slow and waiting for a timeout + // run while there are curl handles, some won't finish in a single iteration since they + // could be slow and waiting for a timeout + } while (running_handles > 0); // cleanup - for(uint64 i = 0; i < requests_consumed; i++){ - EREPORT_MULTI( - curl_multi_remove_handle(worker_state->curl_mhandle, handles[i].ez_handle) - ); + for (uint64 i = 0; i < requests_consumed; i++) { + EREPORT_MULTI(curl_multi_remove_handle(worker_state->curl_mhandle, handles[i].ez_handle)); curl_easy_cleanup(handles[i].ez_handle); @@ -403,16 +402,14 @@ static Size net_memsize(void) { #if PG15_GTE static void net_shmem_request(void) { - if (prev_shmem_request_hook) - prev_shmem_request_hook(); + if (prev_shmem_request_hook) prev_shmem_request_hook(); RequestAddinShmemSpace(net_memsize()); } #endif static void net_shmem_startup(void) { - if (prev_shmem_startup_hook) - prev_shmem_startup_hook(); + if (prev_shmem_startup_hook) prev_shmem_startup_hook(); bool found; @@ -427,7 +424,7 @@ static void net_shmem_startup(void) { worker_state->shared_latch = NULL; ConditionVariableInit(&worker_state->cv); - worker_state->epfd = 0; + worker_state->epfd = 0; worker_state->curl_mhandle = NULL; } @@ -436,64 +433,46 @@ static void net_shmem_startup(void) { void _PG_init(void) { if (IsBinaryUpgrade) { - return; + return; } if (!process_shared_preload_libraries_in_progress) { - ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"), - errhint("Add pg_net to the shared_preload_libraries " - "configuration variable in postgresql.conf.")); + ereport(ERROR, errmsg("pg_net is not in shared_preload_libraries"), + errhint("Add pg_net to the shared_preload_libraries " + "configuration variable in postgresql.conf.")); } RegisterBackgroundWorker(&(BackgroundWorker){ - .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION, - .bgw_start_time = BgWorkerStart_RecoveryFinished, - .bgw_library_name = "pg_net", + .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION, + .bgw_start_time = BgWorkerStart_RecoveryFinished, + .bgw_library_name = "pg_net", .bgw_function_name = "pg_net_worker", - .bgw_name = "pg_net " EXTVERSION " worker", - .bgw_restart_time = net_worker_restart_time_sec, + .bgw_name = "pg_net " EXTVERSION " worker", + .bgw_restart_time = net_worker_restart_time_sec, }); #if PG15_GTE prev_shmem_request_hook = shmem_request_hook; - shmem_request_hook = net_shmem_request; + shmem_request_hook = net_shmem_request; #else RequestAddinShmemSpace(net_memsize()); #endif prev_shmem_startup_hook = shmem_startup_hook; - shmem_startup_hook = net_shmem_startup; - - DefineCustomStringVariable("pg_net.ttl", - "time to live for request/response rows", - "should be a valid interval type", - &guc_ttl, - "6 hours", - PGC_SIGHUP, 0, - NULL, NULL, NULL); - - DefineCustomIntVariable("pg_net.batch_size", - "number of requests executed in one iteration of the background worker", - NULL, - &guc_batch_size, - 200, - 0, PG_INT16_MAX, - PGC_SIGHUP, 0, - NULL, NULL, NULL); - - DefineCustomStringVariable("pg_net.database_name", - "Database where the worker will connect to", - NULL, - &guc_database_name, - "postgres", - PGC_SU_BACKEND, 0, - NULL, NULL, NULL); - - DefineCustomStringVariable("pg_net.username", - "Connection user for the worker", - NULL, - &guc_username, - NULL, - PGC_SU_BACKEND, 0, - NULL, NULL, NULL); + shmem_startup_hook = net_shmem_startup; + + DefineCustomStringVariable("pg_net.ttl", "time to live for request/response rows", + "should be a valid interval type", &guc_ttl, "6 hours", PGC_SIGHUP, 0, + NULL, NULL, NULL); + + DefineCustomIntVariable( + "pg_net.batch_size", "number of requests executed in one iteration of the background worker", + NULL, &guc_batch_size, 200, 0, PG_INT16_MAX, PGC_SIGHUP, 0, NULL, NULL, NULL); + + DefineCustomStringVariable("pg_net.database_name", "Database where the worker will connect to", + NULL, &guc_database_name, "postgres", PGC_SU_BACKEND, 0, NULL, NULL, + NULL); + + DefineCustomStringVariable("pg_net.username", "Connection user for the worker", NULL, + &guc_username, NULL, PGC_SU_BACKEND, 0, NULL, NULL, NULL); }