diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d212c07..7632740 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -579,6 +579,62 @@ jobs: if: always() run: docker rm -f pgque-ts-test + ruby-client-tests: + name: Ruby client tests + runs-on: ubuntu-latest + env: + PGQUE_TEST_DSN: postgresql://postgres:pgque_test@localhost/pgque_test + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Start PostgreSQL 18 + run: | + set -Eeuo pipefail + docker run -d --name pgque-ruby-test \ + -e POSTGRES_PASSWORD=pgque_test \ + -e POSTGRES_DB=pgque_test \ + -p 5432:5432 \ + postgres:18 + + for i in $(seq 1 30); do + docker exec pgque-ruby-test pg_isready -U postgres && break + sleep 1 + done || { echo "PG not ready after 30 seconds"; exit 1; } + + - name: Build pgque + run: bash build/transform.sh + + - name: Install pgque + run: | + set -Eeuo pipefail + PGPASSWORD=pgque_test psql -h localhost -U postgres -d pgque_test \ + -v ON_ERROR_STOP=1 -f sql/pgque.sql + + - name: Setup Ruby + uses: ruby/setup-ruby@v1 + with: + ruby-version: "3.3" + working-directory: clients/ruby + bundler-cache: true + + - name: Ruby client test suite + run: | + set -Eeuo pipefail + cd clients/ruby + bundle exec rake test + + - name: Ruby package smoke + run: | + set -Eeuo pipefail + cd clients/ruby + gem build pgque.gemspec + + - name: Cleanup Ruby test DB + if: always() + run: docker rm -f pgque-ruby-test + verify: runs-on: ubuntu-latest steps: diff --git a/.github/workflows/release-ruby.yml b/.github/workflows/release-ruby.yml new file mode 100644 index 0000000..7e64dee --- /dev/null +++ b/.github/workflows/release-ruby.yml @@ -0,0 +1,127 @@ +name: Release Ruby client + +on: + workflow_dispatch: + inputs: + version: + description: "Version to publish, matching clients/ruby/lib/pgque/version.rb" + required: true + type: string + dry_run: + description: "Build and validate without publishing" + required: true + default: true + type: boolean + +permissions: + contents: read + +jobs: + build: + if: github.ref == 'refs/heads/main' + runs-on: ubuntu-latest + env: + VERSION: ${{ inputs.version }} + defaults: + run: + working-directory: clients/ruby + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ github.sha }} + + - uses: ruby/setup-ruby@v1 + with: + ruby-version: "3.3" + working-directory: clients/ruby + bundler-cache: true + + - name: Verify version input + run: | + set -Eeuo pipefail + # Reject malformed input early. Gem::Version is permissive + # (accepts 0.2.0, 0.2.0.rc.1, 0.2.0.alpha, 0.2.0.beta.1, ...) + # but rejects clear garbage like "x.y.z" or empty strings. + ruby -rrubygems -e "Gem::Version.new(ENV.fetch('VERSION'))" + + # The version constant must already match the input -- bumping + # version.rb is the responsibility of the release-prep PR, not + # this workflow. Any drift here is a configuration error. + actual=$(ruby -e 'require_relative "lib/pgque/version"; print Pgque::VERSION') + test "$actual" = "$VERSION" || { + echo "version input ${VERSION} != lib/pgque/version.rb ${actual}" + exit 1 + } + + - name: Build gem + run: gem build pgque.gemspec + + - name: Verify built gem installs + run: | + set -Eeuo pipefail + mkdir -p /tmp/pgque-gem-check + gem install --install-dir /tmp/pgque-gem-check --no-document \ + "./pgque-${VERSION}.gem" + GEM_PATH=/tmp/pgque-gem-check GEM_HOME=/tmp/pgque-gem-check \ + ruby -e ' + require "pgque" + expected = ENV.fetch("VERSION") + raise "version mismatch: expected #{expected}, got #{Pgque::VERSION}" \ + unless Pgque::VERSION == expected + raise "Pgque::Client missing" unless defined?(Pgque::Client) + raise "Pgque::Consumer missing" unless defined?(Pgque::Consumer) + puts "install verified: pgque #{Pgque::VERSION}" + ' + + publish-rubygems: + if: ${{ !inputs.dry_run }} + needs: build + runs-on: ubuntu-latest + environment: rubygems + permissions: + # contents:write is required so `rake release` (invoked by + # rubygems/release-gem) can push the v${VERSION} git tag back to + # origin. id-token:write is for the OIDC handshake with + # rubygems.org. + contents: write + id-token: write + defaults: + run: + working-directory: clients/ruby + steps: + # Check out the branch ref (not the bare SHA) so we land on an + # attached HEAD; bundler's release:source_control_push runs plain + # `git push`, which fails from detached HEAD. The build job (run + # immediately before this one) already verified the version + # against the dispatch SHA, so any race with main moving during + # the workflow would have to land a new commit AND a version + # bump in that window -- vanishingly small. + - uses: actions/checkout@v4 + with: + ref: ${{ github.ref }} + # Full history so existing tags are visible to release:guard_clean. + fetch-depth: 0 + + - uses: ruby/setup-ruby@v1 + with: + ruby-version: "3.3" + working-directory: clients/ruby + bundler-cache: true + + - name: Configure git identity for tag push + run: | + git config user.name "github-actions[bot]" + git config user.email "41898282+github-actions[bot]@users.noreply.github.com" + + # rubygems/release-gem performs the OIDC handshake (no long-lived + # RUBYGEMS_API_KEY secret needed) and then runs + # `bundle exec rake release`, which depends on the gem tasks + # provided by `require "bundler/gem_tasks"` in clients/ruby/Rakefile. + # The chain is: build -> release:guard_clean -> release:source_control_push + # (tags v${VERSION} and pushes the tag) -> release:rubygem_push. + - name: Publish to RubyGems via OIDC + uses: rubygems/release-gem@v1 + with: + working-directory: clients/ruby + setup-trusted-publisher: true + await-release: true diff --git a/.gitignore b/.gitignore index 4d5ed84..536e1c5 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,10 @@ build/output/ clients/python/*.egg-info/ clients/typescript/node_modules/ clients/typescript/package-lock.json +clients/ruby/Gemfile.lock +clients/ruby/pkg/ +clients/ruby/.bundle/ +clients/ruby/*.gem # Claude Code agent worktrees (ephemeral isolation per agent run) .claude/worktrees/ diff --git a/README.md b/README.md index 724e79a..bdf40d8 100644 --- a/README.md +++ b/README.md @@ -336,7 +336,7 @@ Longer walkthrough in the [tutorial](docs/tutorial.md); patterns like fan-out, e ## Client libraries -PgQue is SQL-first, so any Postgres driver works. First-party client libraries live in this repo for **Python**, **Go**, and **TypeScript**, all published at `v0.2.0`. +PgQue is SQL-first, so any Postgres driver works. First-party client libraries live in this repo for **Python**, **Go**, and **TypeScript** (all published at `v0.2.0`), plus **Ruby**, shipping in v0.3. ### Python (`pgque-py`) — psycopg 3 @@ -410,6 +410,32 @@ try { } ``` +### Ruby (`pgque`) — pg gem + +```bash +gem install pgque --pre # or pin: gem "pgque", "0.3.0.rc.1" +``` + +```ruby +require "pgque" + +Pgque.connect("postgresql://localhost/mydb") do |client| + # one-time setup (typically in a migration) + client.conn.exec("select pgque.create_queue('orders')") + client.conn.exec("select pgque.subscribe('orders', 'processor')") + + client.send("orders", { "order_id" => 42 }, type: "order.created") +end + +consumer = Pgque::Consumer.new( + "postgresql://localhost/mydb", + queue: "orders", + name: "processor", +) +consumer.on("order.created") { |msg| process_order(msg.payload) } +consumer.start # blocks until SIGTERM/SIGINT; needs pgque.ticker() running +``` + ### Any language ```sql diff --git a/clients/README.md b/clients/README.md index d7ee1be..2c00fcb 100644 --- a/clients/README.md +++ b/clients/README.md @@ -1,6 +1,6 @@ # PgQue clients -PgQue ships three first-party clients. They are thin wrappers over `pgque.*` +PgQue ships four first-party clients. They are thin wrappers over `pgque.*` SQL primitives. The matrix below tracks the public client API on current `main`. @@ -74,34 +74,33 @@ users to install `--pre`, `@rc`, or an `-rc` Go tag. ## Current parity matrix -| Capability | Python | Go | TypeScript | -| --- | :---: | :---: | :---: | -| `connect` / `close` | ✓ | ✓ | ✓ | -| Raw SQL escape hatch | ✓ (`conn`) | ✓ (`Pool()`) | ✓ (`rawPool`) | -| PgQue-classified errors | ✓ | ✓ | ✓ | -| Lossless PostgreSQL `bigint` IDs | ✓ (`int`) | ✓ (`int64`) | ✓ (`bigint`) | -| `send` | ✓ | ✓ | ✓ | -| `send_batch` / `SendBatch` / `sendBatch` | ✓ | ✓ | ✓ | -| `receive` | ✓ | ✓ | ✓ | -| `ack` returns SQL rowcount (0 stale, 1 success) | ✓ (int) | ✓ (int64) | ✓ (number) | -| `nack` | ✓ | ✓ | ✓ | -| `ticker` / `Ticker` / `ticker`, `ticker_all` / `TickerAll` / `tickerAll` | ✓ | ✓ | ✓ | -| `force_next_tick` / `ForceNextTick` / `forceNextTick` | ✓ | ✓ | ✓ | -| `nack` retry delay + reason options | ✓ | ✓ | ✓ | -| High-level `Consumer` | ✓ | ✓ | ✓ | -| Consumer wakeup model | polling + optional LISTEN/NOTIFY wakeup | polling | polling | -| `Consumer` poll interval option | ✓ | ✓ | ✓ | -| `Consumer` max-messages option | ✓ | ✓ | ✓ | -| `Consumer` retry delay option | ✓ | ✓ | ✓ | -| Unknown-type behavior avoids silent ack | ✓ | ✓ | ✓ | -| Configurable unknown-type policy | ✓ | ✓ | ✓ | -| `subscribe` / `unsubscribe` wrappers | ✓ | ✓ | ✓ | -| Cooperative consumers (experimental) [^coop] | ✓ | ✓ | ✓ | +| Capability | Python | Go | TypeScript | Ruby | +| --- | :---: | :---: | :---: | :---: | +| `connect` / `close` | ✓ | ✓ | ✓ | ✓ | +| Raw SQL escape hatch | ✓ (`conn`) | ✓ (`Pool()`) | ✓ (`rawPool`) | ✓ (`conn`) | +| PgQue-classified errors | ✓ | ✓ | ✓ | ✓ | +| Lossless PostgreSQL `bigint` IDs | ✓ (`int`) | ✓ (`int64`) | ✓ (`bigint`) | ✓ (`Integer`) | +| `send` | ✓ | ✓ | ✓ | ✓ | +| `send_batch` / `SendBatch` / `sendBatch` | ✓ | ✓ | ✓ | ✓ | +| `receive` | ✓ | ✓ | ✓ | ✓ | +| `ack` returns SQL rowcount (0 stale, 1 success) | ✓ (int) | ✓ (int64) | ✓ (number) | ✓ (Integer) | +| `nack` | ✓ | ✓ | ✓ | ✓ | +| `ticker` / `Ticker` / `ticker`, `ticker_all` / `TickerAll` / `tickerAll` | ✓ | ✓ | ✓ | ✓ | +| `force_next_tick` / `ForceNextTick` / `forceNextTick` | ✓ | ✓ | ✓ | ✓ | +| `nack` retry delay + reason options | ✓ | ✓ | ✓ | ✓ | +| High-level `Consumer` | ✓ | ✓ | ✓ | ✓ | +| Consumer wakeup model | polling + optional LISTEN/NOTIFY wakeup | polling | polling | polling + LISTEN/NOTIFY wakeup | +| `Consumer` poll interval option | ✓ | ✓ | ✓ | ✓ | +| `Consumer` max-messages option | ✓ | ✓ | ✓ | ✓ | +| `Consumer` retry delay option | ✓ | ✓ | ✓ | ✓ | +| Unknown-type behavior avoids silent ack | ✓ | ✓ | ✓ | ✓ | +| Configurable unknown-type policy | ✓ | ✓ | ✓ | ✓ | +| `subscribe` / `unsubscribe` wrappers | ✓ | ✓ | ✓ | ✓ | +| Cooperative consumers (experimental) [^coop] | ✓ | ✓ | ✓ | ✓ | Legend: ✓ supported by the client API on `main`; ✗ not exposed as a first-class client API. Lower-level SQL primitives remain available through raw -connection/pool escape hatches. Python, Go, and TypeScript expose ticker -convenience wrappers. +connection/pool escape hatches. [^coop]: Experimental. Each supporting client exposes `subscribe_subconsumer` / `unsubscribe_subconsumer` / `receive_coop` / diff --git a/clients/ruby/.gitignore b/clients/ruby/.gitignore new file mode 100644 index 0000000..b6303c5 --- /dev/null +++ b/clients/ruby/.gitignore @@ -0,0 +1,5 @@ +/Gemfile.lock +/pkg/ +/.bundle/ +/coverage/ +*.gem diff --git a/clients/ruby/Gemfile b/clients/ruby/Gemfile new file mode 100644 index 0000000..b4e2a20 --- /dev/null +++ b/clients/ruby/Gemfile @@ -0,0 +1,3 @@ +source "https://rubygems.org" + +gemspec diff --git a/clients/ruby/LICENSE b/clients/ruby/LICENSE new file mode 100644 index 0000000..62a462a --- /dev/null +++ b/clients/ruby/LICENSE @@ -0,0 +1,191 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to the Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by the Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding any notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2026 Nikolay Samokhvalov + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/clients/ruby/README.md b/clients/ruby/README.md new file mode 100644 index 0000000..74b9cd6 --- /dev/null +++ b/clients/ruby/README.md @@ -0,0 +1,270 @@ +# pgque + +Ruby client for [PgQue](https://github.com/NikolayS/pgque) — the PgQ-based +universal PostgreSQL queue. Thin wrapper over `pgque-api` SQL functions: +`send`, `send_batch`, `subscribe`, `unsubscribe`, `receive`, `ack`, +`nack`, `ticker`, `ticker_all`, `force_next_tick`, plus a polling +`Consumer` with `LISTEN`/`NOTIFY` wakeup. + +## Install + +```bash +gem install pgque --pre +``` + +`--pre` is required while v0.3.0 is in release-candidate; the latest +published version is `0.3.0.rc.1`. Pin the exact version if you prefer: + +```ruby +gem "pgque", "0.3.0.rc.1" +``` + +Requires Ruby 3.1+ and PostgreSQL 14+ with the PgQue schema installed +(`\i pgque.sql` — no extension required). + +## Database permissions + +The connecting database role needs `pgque_reader` to consume (`receive`, +`ack`, `nack`, `subscribe`, `unsubscribe`) and `pgque_writer` to produce +(`send`, `send_batch`). The two are **siblings** — neither inherits the +other. An app that both produces and consumes (the typical case for code +using this client) must be granted **both** roles: + +```sql +grant pgque_reader to your_app_user; +grant pgque_writer to your_app_user; +``` + +See [`docs/reference.md` — Roles and grants](../../docs/reference.md#roles-and-grants) +for the full role table. + +## Quickstart + +```ruby +require "pgque" + +Pgque.connect("postgresql://localhost/mydb") do |client| + # one-time setup (typically in a migration) + client.conn.exec("select pgque.create_queue('orders')") + client.subscribe("orders", "order_worker") + + # producer + event_id = client.send("orders", { "order_id" => 42 }, type: "order.created") + batch_ids = client.send_batch("orders", "order.created", [ + { "order_id" => 43 }, + { "order_id" => 44 }, + ]) + puts "#{event_id} #{batch_ids.inspect}" +end + +# consumer (separate process / thread) +consumer = Pgque::Consumer.new( + "postgresql://localhost/mydb", + queue: "orders", + name: "order_worker", +) + +consumer.on("order.created") { |msg| process_order(msg.payload) } + +# Optional: catch-all handler for types with no specific handler. +# Without it, messages with unhandled types are nacked by default +# (sent to retry_queue, or to the dead-letter queue once +# queue_max_retries is exhausted). Register a "*" handler to take +# explicit control. +consumer.on("*") { |msg| log_unhandled(msg.type, msg.payload) } + +consumer.start # blocks until SIGTERM / SIGINT +``` + +The consumer only sees events after `pgque.ticker()` has materialized +a batch. With `pg_cron` available, run `select pgque.start();` once +to schedule the default 10 ticks/sec. Without `pg_cron`, drive +ticking from your application or an external scheduler — see the +project [Installation](https://github.com/NikolayS/pgque#installation) +section for both paths. + +### Consumer options + +`Consumer.new(..., max_messages: ...)` controls the per-`receive` limit. +The default is PostgreSQL's `int` maximum, so the consumer requests +the whole PgQ batch before acknowledging it. `ack` finishes the +entire underlying PgQ batch, including rows beyond `max_messages`; +only lower this value when it is at least as large as the queue's +worst-case batch size, otherwise rows past the limit are silently +skipped by the batch ack. + +Other options: `poll_interval:` (seconds between polls when no +`LISTEN`/`NOTIFY` arrives, default 30), `retry_after:` (seconds before +nacked messages are retried, default 60), and `logger:` (a `Logger` +instance; the default targets `$stderr` at `FATAL`, so the consumer is +effectively silent unless you set `PGQUE_LOG_LEVEL=warn` or pass your +own). + +### Handling unknown event types + +By default the consumer **nacks** any message whose type has no +registered handler and no `"*"` catch-all. The message is retried (or +dead-lettered once `queue_max_retries` is exhausted) so unknown types +are never silently dropped. + +To ack unknown types instead, pass `unknown_handler_policy: "ack"`: + +```ruby +consumer = Pgque::Consumer.new( + "postgresql://localhost/mydb", + queue: "orders", + name: "order_worker", + unknown_handler_policy: "ack", # log WARNING and ack; do not nack +) +``` + +## Experimental: cooperative consumers + +> **Experimental in PgQue 0.2.** Function names, edge-case behavior, and +> client API shape may change before this feature is marked stable. Do +> not use this as the only processing path for critical workloads +> without idempotent handlers and stale-worker takeover tests. + +Cooperative consumers let several worker processes share **one logical +consumer**. Each batch is handed to exactly one subconsumer; the main +row owns the group cursor, member rows own active batches. See +[`docs/reference.md` — Cooperative consumers / subconsumers](../../docs/reference.md#cooperative-consumers--subconsumers) +for the SQL surface. + +Two-worker example (each worker holds its own connection / process): + +```ruby +require "pgque" + +# worker-1 +c1 = Pgque::Consumer.new( + "postgresql://localhost/mydb", + queue: "orders", + name: "order_worker", + subconsumer: "worker-1", + dead_interval: "5 minutes", # optional: take over a stale sibling +) + +c1.on("order.created") { |msg| process(msg) } + +c1.start # in a second process: subconsumer: "worker-2" +``` + +`Consumer.new(subconsumer: ...)` switches the poll loop to +`receive_coop` and uses the cooperative cursor. `dead_interval:` is +only valid in cooperative mode; passing it without `subconsumer:` +raises `ArgumentError`. + +The low-level methods on `Pgque::Client` are also available for direct +use: + +```ruby +client.subscribe_subconsumer("orders", "order_worker", "worker-1") +msgs = client.receive_coop( + "orders", "order_worker", "worker-1", + max_messages: 100, dead_interval: "5 minutes", +) +client.ack(msgs[0].batch_id) +client.touch_subconsumer("orders", "order_worker", "worker-1") +client.unsubscribe_subconsumer( + "orders", "order_worker", "worker-1", batch_handling: 1, +) +``` + +`unsubscribe_subconsumer(..., batch_handling: 0)` (the default) raises +if the subconsumer holds an active batch; pass `batch_handling: 1` to +route active messages through retry/DLQ before removal. + +## Manual ticking + +For tests, demos, or manual operation without `pg_cron`, use +`client.force_next_tick(queue)` to force the **next** ticker call to +materialize a tick. It does not insert the tick itself: + +```ruby +client.force_next_tick("orders") +client.ticker("orders") +``` + +`client.ticker_all` runs the global ticker across all eligible queues +and returns the number of queues that received a tick. + +## Transactions + +`send` → ticker → `receive` must each run in its own committed +transaction (PgQue is snapshot-based). Ruby's `pg` gem runs each +statement in its own implicit transaction by default — the equivalent +of psycopg's `autocommit=True` — so the snippets above already commit +between phases without any explicit `BEGIN`/`COMMIT`. + +To group several statements into one transaction (for example, to +publish a batch atomically with surrounding bookkeeping), wrap them in +a `transaction` block on the underlying `PG::Connection`: + +```ruby +client.conn.transaction do + client.send("orders", { "order_id" => 42 }) + bookkeeping(client.conn) +end # commits here; raises rollback the whole block +``` + +Don't wrap `send` and `receive` in one explicit transaction; same for +`maint_retry_events` + `ticker`. See the +[snapshot rule](https://github.com/NikolayS/pgque/blob/main/docs/pgq-concepts.md#snapshot-rule). +The built-in `Pgque::Consumer` already wraps `receive` + dispatch + +`ack` in a single `conn.transaction` per poll, so handler code does +not need to manage that. + +## A note on `Pgque::Client#send` + +The producer method is called `send` to mirror the SQL surface +(`pgque.send(queue, payload)`) and the Python/TS clients. That name +shadows Ruby's `Object#send`, which is widely used for reflective +method invocation. This means `client.send(:close)` calls the SQL +`send`, **not** the `close` method. + +Two well-known Ruby escape hatches restore reflective dispatch on a +`Pgque::Client` instance: + +```ruby +client.__send__(:close) # canonical "always works" form +client.public_send(:close) # safer: respects visibility +``` + +Use `__send__` or `public_send` whenever you need to call a method on +a `Pgque::Client` by name. The Pgque API itself never calls these +internally. + +## Tests + +Integration tests require a running PostgreSQL with the PgQue schema +installed. Set `PGQUE_TEST_DSN` and run rake: + +```bash +PGQUE_TEST_DSN=postgresql://postgres:pgque_test@localhost/pgque_test \ + bundle exec rake test +``` + +Without `PGQUE_TEST_DSN`, the tests skip. + +## Distribution + +The RubyGems distribution is `pgque`; require it as: + +```ruby +require "pgque" +``` + +See [RELEASE.md](RELEASE.md) for publishing steps. + +## More + +- Schema install, full reference, tutorial: + +- Per-function SQL reference: + +- Issues: + +## License + +Apache-2.0. Copyright 2026 Nikolay Samokhvalov. diff --git a/clients/ruby/RELEASE.md b/clients/ruby/RELEASE.md new file mode 100644 index 0000000..204e6b0 --- /dev/null +++ b/clients/ruby/RELEASE.md @@ -0,0 +1,127 @@ +# Ruby client release + +Gem name: `pgque` on RubyGems.org. + +```bash +gem install pgque --pre # while v0.3.0 is in release-candidate +``` + +```ruby +require "pgque" +``` + +## Versioning + +The Ruby client version is independent from the SQL/server +`pgque.version()`. Bump this gem when the Ruby API or packaging changes; +server-only SQL changes do not require a Ruby client release. + +Use Ruby gem version strings in `clients/ruby/lib/pgque/version.rb`. For a +pre-release build, use dot-separated suffixes like `0.2.0.rc.1`, +`0.2.0.alpha.1`, or `0.2.0.beta`; do **not** use Git-style `0.2.0-dev` +hyphens, which `Gem::Version` parses but other tooling does not. +RubyGems treats any version containing a non-numeric segment as a +pre-release; users need `gem install pgque --pre` to receive it. + +## Bootstrap (first publish only) + +RubyGems' Trusted Publishing requires the gem to **already exist** on +the registry before a trusted publisher can be configured. The very +first release is therefore manual: + +```bash +cd clients/ruby +gem build pgque.gemspec +gem signin # one-time, prompts for rubygems.org credentials +gem push pgque-0.3.0.rc.1.gem +``` + +After that, every subsequent release goes through the workflow below. + +## GitHub environment prerequisite + +Before the first workflow-driven publish, create a GitHub environment +in `NikolayS/pgque`: + +- `rubygems` + +Protect it as appropriate for releases (for example, required reviewers +and `main` branch restrictions). The workflow also checks that it is +running from `main`, but environment protection is the human approval +gate. + +## RubyGems Trusted Publisher prerequisite + +After the bootstrap publish, configure Trusted Publishing on +rubygems.org: + +1. Sign in to rubygems.org and open the gem's page. +2. **Settings → Trusted Publishers → Add Publisher**. +3. Provider: GitHub Actions. +4. Repository: `NikolayS/pgque`. +5. Workflow: `release-ruby.yml`. +6. Environment: `rubygems`. + +Pin to a specific tag/branch only if you want to lock down which refs +can publish; otherwise leave the ref restriction empty. + +## Release process + +The release workflow is `.github/workflows/release-ruby.yml`. + +1. Update `clients/ruby/lib/pgque/version.rb` and any release notes / + changelog if present. +2. Merge the release prep PR. +3. Ensure the `rubygems` GitHub environment exists and is protected. +4. Ensure the gem already exists on RubyGems and Trusted Publishing + is configured (bootstrap section above). +5. Run **Release Ruby client** with `dry_run=true` first. Dry runs + only build, validate the version match, and smoke-install the + resulting `.gem`; they do not require the `rubygems` environment + approval or OIDC permissions. +6. Run it with `dry_run=false`. Approve the `rubygems` environment + when prompted. +7. Verify the published artifact installs in a clean environment: + + ```bash + gem install pgque --pre # or pin: gem install pgque -v 0.3.0.rc.1 + ruby -rpgque -e 'puts Pgque::VERSION' + ``` + +The workflow builds with `gem build`, smoke-installs the resulting +`.gem` against a temporary `GEM_HOME`, and publishes via RubyGems +Trusted Publishing / OIDC. No long-lived `RUBYGEMS_API_KEY` is +needed. + +The publish step uses `rubygems/release-gem@v1`, which runs +`bundle exec rake release`. That task (provided by +`require "bundler/gem_tasks"` in `clients/ruby/Rakefile`) chains: + +1. `rake build` — builds `pgque-${VERSION}.gem` under `pkg/`. +2. `release:guard_clean` — refuses to release if the working tree + has uncommitted changes (CI checkouts are clean). +3. `release:source_control_push` — annotates the head commit with a + `v${VERSION}` tag and pushes that tag to `origin`. The + `contents: write` permission on the publish job, plus the + `GITHUB_TOKEN` automatically injected by `actions/checkout`, is + what authorizes the push. **The release workflow therefore + pushes a git tag to `NikolayS/pgque` as a side effect.** If you + need to retract a release, yank the gem on RubyGems *and* delete + the tag with `git push --delete origin v${VERSION}`. +4. `release:rubygem_push` — `gem push pkg/pgque-${VERSION}.gem`. + +If the gem push fails after the tag has already been pushed (rare +but possible if rubygems.org is degraded), you'll have a `v${VERSION}` +tag with no corresponding published gem. Re-running the workflow +will then fail at `release:guard_clean` if the tag already exists; +delete the tag and re-dispatch. + +## Why no test registry? + +Unlike PyPI's TestPyPI sibling, RubyGems.org has no public staging +instance. Dry-run validation in this workflow covers `gem build` and +local install verification; the next step is the real publish. If you +need an isolated end-to-end test for the publish path itself, push to +a privately-owned alias gem (e.g. `pgque-staging`) using the same +workflow with a different gemspec name, then drop the alias gem when +you're done. diff --git a/clients/ruby/Rakefile b/clients/ruby/Rakefile new file mode 100644 index 0000000..ecb388b --- /dev/null +++ b/clients/ruby/Rakefile @@ -0,0 +1,12 @@ +require "bundler/gem_tasks" +require "rake/testtask" + +Rake::TestTask.new(:test) do |t| + t.libs << "lib" + t.libs << "test" + t.test_files = FileList["test/test_*.rb"] + t.warning = false + t.verbose = true +end + +task default: :test diff --git a/clients/ruby/lib/pgque.rb b/clients/ruby/lib/pgque.rb new file mode 100644 index 0000000..d6741c8 --- /dev/null +++ b/clients/ruby/lib/pgque.rb @@ -0,0 +1,35 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. +# PgQue includes code derived from PgQ (ISC license, +# Marko Kreen / Skype Technologies OU). + +require "json" +require "time" +require "pg" + +require "pgque/version" +require "pgque/errors" +require "pgque/event" +require "pgque/message" +require "pgque/client" +require "pgque/consumer" + +module Pgque + # Open a connection and return a Pgque::Client. + # + # Ruby's pg gem runs each statement in its own implicit transaction + # by default -- the equivalent of psycopg's autocommit=True. To group + # statements into one transaction, use conn.transaction { ... } on the + # underlying PG::Connection (client.conn). There is no autocommit + # flag because Ruby pg has no per-connection autocommit attribute to + # toggle; transaction control is per-call via the transaction block. + def self.connect(dsn) + client = Client.connect(dsn) + return client unless block_given? + + begin + yield client + ensure + client.close + end + end +end diff --git a/clients/ruby/lib/pgque/client.rb b/clients/ruby/lib/pgque/client.rb new file mode 100644 index 0000000..8b5dcd4 --- /dev/null +++ b/clients/ruby/lib/pgque/client.rb @@ -0,0 +1,283 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. +# PgQue includes code derived from PgQ (ISC license, +# Marko Kreen / Skype Technologies OU). + +module Pgque + # Thin wrapper over the pgque SQL functions. + # + # Note: Pgque::Client#send mirrors the SQL `pgque.send(queue, payload)` + # primitive and the Python/TS client surface. That name shadows + # Ruby's Object#send, so use #__send__ or #public_send when you need + # to invoke a method on a Pgque::Client instance reflectively. + class Client + attr_reader :conn + + def self.connect(dsn) + conn = PG.connect(dsn) + new(conn, owns_conn: true) + rescue PG::ConnectionBad => e + raise ConnectionError, e.message + end + + def initialize(conn, owns_conn: false) + @conn = conn + @owns_conn = owns_conn + end + + def close + return unless @owns_conn + return if @conn.finished? + @conn.close + end + + def send(queue, payload, type: "default") + if payload.is_a?(Event) + type = payload.type + payload = payload.payload + end + encoded = encode_payload(payload) + result = + if custom_type?(type) + @conn.exec_params( + "select pgque.send($1, $2, $3::jsonb)", + [queue, type, encoded], + ) + else + @conn.exec_params( + "select pgque.send($1, $2::jsonb)", + [queue, encoded], + ) + end + integer_scalar(result) + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def send_batch(queue, type, payloads) + encoded = payloads.map { |p| encode_payload(p) } + array_literal = pg_text_array(encoded) + result = @conn.exec_params( + "select unnest(pgque.send_batch($1, $2, $3::jsonb[]))", + [queue, type, array_literal], + ) + integer_column(result) + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def receive(queue, consumer, max_messages = 100) + result = @conn.exec_params( + "select * from pgque.receive($1, $2, $3)", + [queue, consumer, max_messages], + ) + result.each_row.map { |row| row_to_message(row) } + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def ack(batch_id) + result = @conn.exec_params("select pgque.ack($1)", [batch_id]) + integer_scalar(result) + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def subscribe(queue, consumer) + result = @conn.exec_params( + "select pgque.subscribe($1, $2)", [queue, consumer] + ) + integer_scalar(result) + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def unsubscribe(queue, consumer) + result = @conn.exec_params( + "select pgque.unsubscribe($1, $2)", [queue, consumer] + ) + integer_scalar(result) + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def force_next_tick(queue) + result = @conn.exec_params("select pgque.force_next_tick($1)", [queue]) + v = scalar(result) + v.nil? || v.empty? ? nil : v.to_i + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def ticker(queue) + result = @conn.exec_params("select pgque.ticker($1)", [queue]) + v = scalar(result) + v.nil? || v.empty? ? nil : v.to_i + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def ticker_all + result = @conn.exec_params("select pgque.ticker()", []) + integer_scalar(result) + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + # Experimental: function names, edge-case behavior, and signatures may + # change before the cooperative API is marked stable. + def subscribe_subconsumer(queue, consumer, subconsumer) + result = @conn.exec_params( + "select pgque.subscribe_subconsumer($1, $2, $3)", + [queue, consumer, subconsumer], + ) + integer_scalar(result) + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def unsubscribe_subconsumer(queue, consumer, subconsumer, batch_handling: 0) + result = @conn.exec_params( + "select pgque.unsubscribe_subconsumer($1, $2, $3, $4)", + [queue, consumer, subconsumer, batch_handling], + ) + integer_scalar(result) + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def receive_coop(queue, consumer, subconsumer, max_messages: 100, + dead_interval: nil) + result = @conn.exec_params( + "select * from pgque.receive_coop($1, $2, $3, $4, $5::interval)", + [queue, consumer, subconsumer, max_messages, dead_interval], + ) + result.each_row.map { |row| row_to_message(row) } + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def touch_subconsumer(queue, consumer, subconsumer) + result = @conn.exec_params( + "select pgque.touch_subconsumer($1, $2, $3)", + [queue, consumer, subconsumer], + ) + integer_scalar(result) + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + def nack(batch_id, msg, retry_after: 60, reason: nil) + payload_str = case msg.payload + when Hash, Array then JSON.dump(msg.payload) + when nil then "null" + else msg.payload.to_s + end + created_at_str = format_created_at(msg.created_at) + + @conn.exec_params( + "select pgque.nack($1, " \ + "ROW($2, $3, $4, $5::jsonb, $6, $7, $8, $9, $10, $11)::pgque.message, " \ + "$12::interval, $13)", + [ + batch_id, msg.msg_id, msg.batch_id, msg.type, payload_str, + msg.retry_count, created_at_str, + msg.extra1, msg.extra2, msg.extra3, msg.extra4, + "#{retry_after} seconds", reason, + ], + ) + nil + rescue PG::Error => e + raise_wrapped_sql_error(e) + end + + private + + # Hash/Array: JSON-encoded. + # nil: literal "null" so ::jsonb yields JSON null (not SQL NULL). + # String: passed through verbatim; caller must supply valid JSON text. + # Anything else (Integer, Float, true, false, Symbol, ...): coerced + # via #to_s so numerics and booleans round-trip naturally + # (42 -> "42", true -> "true"). Symbols and other objects whose + # to_s isn't valid JSON will surface a SQL error from the ::jsonb + # cast -- callers who care should pre-encode with JSON.dump. + def encode_payload(payload) + case payload + when Hash, Array then JSON.dump(payload) + when nil then "null" + when String then payload + else payload.to_s + end + end + + def pg_text_array(strings) + escaped = strings.map do |s| + inner = s.to_s.gsub('\\') { '\\\\' }.gsub('"') { '\\"' } + "\"#{inner}\"" + end + "{#{escaped.join(',')}}" + end + + def row_to_message(row) + Message.new( + msg_id: row[0].to_i, + batch_id: row[1].to_i, + type: row[2], + payload: parse_jsonb(row[3]), + retry_count: row[4].nil? ? nil : row[4].to_i, + created_at: row[5].nil? ? nil : Time.parse(row[5]), + extra1: row[6], + extra2: row[7], + extra3: row[8], + extra4: row[9], + ) + end + + def parse_jsonb(text) + return nil if text.nil? + JSON.parse(text) + rescue JSON::ParserError + text + end + + def wrap_sql_error(error) + msg = error.message.to_s + low = msg.downcase + if low.include?("queue not found") + QueueNotFound.new(msg) + elsif low.include?("batch not found") + BatchNotFound.new(msg) + else + Error.new(msg) + end + end + + def raise_wrapped_sql_error(error) + wrapped = wrap_sql_error(error) + wrapped.set_backtrace(error.backtrace) if error.backtrace + raise wrapped, cause: error + end + + def scalar(result) + result.getvalue(0, 0) + end + + def integer_scalar(result) + scalar(result).to_i + end + + def integer_column(result) + result.column_values(0).map(&:to_i) + end + + def custom_type?(type) + !type.to_s.empty? && type != "default" + end + + def format_created_at(value) + case value + when Time then value.iso8601(6) + else value + end + end + end +end diff --git a/clients/ruby/lib/pgque/consumer.rb b/clients/ruby/lib/pgque/consumer.rb new file mode 100644 index 0000000..e2a13b5 --- /dev/null +++ b/clients/ruby/lib/pgque/consumer.rb @@ -0,0 +1,273 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. +# PgQue includes code derived from PgQ (ISC license, +# Marko Kreen / Skype Technologies OU). + +require "logger" + +module Pgque + class Consumer + DEFAULT_MAX_MESSAGES = 2_147_483_647 + WAIT_SLICE_SECONDS = 0.5 + + attr_reader :dsn, :queue, :name, :poll_interval, :max_messages, + :retry_after, :subconsumer, :dead_interval + + attr_accessor :logger + + def initialize(dsn, queue:, name:, poll_interval: 30, + max_messages: DEFAULT_MAX_MESSAGES, retry_after: 60, + unknown_handler_policy: "nack", subconsumer: nil, + dead_interval: nil, logger: nil) + @dsn = dsn + @queue = queue + @name = name + @poll_interval = poll_interval + @max_messages = max_messages + @retry_after = retry_after + + unless ["nack", "ack"].include?(unknown_handler_policy.to_s) + raise ArgumentError, + "unknown_handler_policy must be 'nack' or 'ack', " \ + "got #{unknown_handler_policy.inspect}" + end + @unknown_handler_policy = unknown_handler_policy.to_s + + if dead_interval && subconsumer.nil? + raise ArgumentError, + "dead_interval is only valid in cooperative mode " \ + "(set subconsumer:)" + end + @subconsumer = subconsumer + @dead_interval = dead_interval + + @handlers = {} + @default_handler = nil + # @running is a plain boolean. Ruby integer/boolean assignment + # is atomic, and the only cross-thread interactions are the + # signal trap and Consumer#stop flipping it false while the + # main loop polls running? -- no ordering dependencies, so a + # mutex would be overkill (and unsafe to enter from a signal + # trap, which raises ThreadError on Mutex#synchronize). + @running = false + @stop_signum = nil + @logger = logger || default_logger + end + + def on(event_type, &block) + raise ArgumentError, "block required for Consumer#on" unless block + + if event_type == "*" + @default_handler = block + else + @handlers[event_type] = block + end + block + end + + def start + @running = true + @stop_signum = nil + + in_main_thread = (Thread.current == Thread.main) + original_handlers = {} + + # Signal traps run in a restricted context: Mutex#synchronize, + # Logger#info, and most blocking code raise ThreadError. Keep + # this proc to plain instance-variable writes; the main loop + # logs the signal number after waking up. + stop_proc = ->(signum) { + @stop_signum = signum + @running = false + } + + if in_main_thread + ["TERM", "INT"].each do |sig| + original_handlers[sig] = Signal.trap(sig) { stop_proc.call(sig) } + end + end + + begin + conn = PG.connect(@dsn) + begin + channel = "pgque_#{@queue}" + conn.exec("LISTEN #{conn.escape_identifier(channel)}") + @logger.info( + "consumer #{@name} listening on #{@queue} (poll=#{@poll_interval}s)" + ) + + while running? + poll_once(conn) + break unless running? + wait_for_notify_or_stop(conn) + end + + if @stop_signum + @logger.info("received signal #{@stop_signum}, shutting down") + end + ensure + conn.close unless conn.finished? + end + ensure + # Clear running? before logging so callers observing the flag + # see "stopped" by the time the log line is written -- and so + # an exception during PG.connect, LISTEN, or the poll loop + # leaves the consumer in a consistent state instead of a + # ghost "running" with no live worker. Plain instance-var + # write -- not the trap-context-unsafe pattern. + @running = false + if in_main_thread + original_handlers.each { |sig, h| Signal.trap(sig, h || "DEFAULT") } + end + @logger.info("consumer #{@name} stopped") + end + end + + def stop + @running = false + end + + def running? + @running + end + + # Public for testability; not part of the stable API. + def poll_once(conn) + conn.transaction do + client = Client.new(conn) + msgs = + if @subconsumer + client.receive_coop( + @queue, @name, @subconsumer, + max_messages: @max_messages, + dead_interval: @dead_interval, + ) + else + client.receive(@queue, @name, @max_messages) + end + + next if msgs.empty? + + batch_id = msgs[0].batch_id + @logger.debug("batch #{batch_id}: #{msgs.size} message(s)") + + nack_failed = dispatch_batch(client, batch_id, msgs) + + next if nack_failed + + rowcount = client.ack(batch_id) + if rowcount == 0 + @logger.warn( + "pgque: ack batch #{batch_id} returned 0 -- stale or " \ + "double ack (batch already finished or not found)", + ) + end + end + end + + private + + def dispatch_batch(client, batch_id, msgs) + nack_failed = false + msgs.each do |msg| + handler = @handlers[msg.type] || @default_handler + + if handler.nil? + if @unknown_handler_policy == "ack" + @logger.warn( + "no handler for event type=#{msg.type} ev_id=#{msg.msg_id}; " \ + "acking", + ) + next + end + @logger.warn( + "no handler for event type=#{msg.type} ev_id=#{msg.msg_id}; " \ + "nacking", + ) + begin + client.nack(batch_id, msg, retry_after: @retry_after, + reason: "no handler for type=#{msg.type}") + rescue StandardError => e + nack_failed = true + @logger.error( + "nack failed for unhandled msg_id=#{msg.msg_id}: " \ + "#{e.class}: #{e.message}", + ) + end + next + end + + begin + handler.call(msg) + rescue StandardError => e + @logger.error( + "handler failed for msg_id=#{msg.msg_id}: " \ + "#{e.class}: #{e.message}", + ) + begin + client.nack(batch_id, msg, retry_after: @retry_after) + rescue StandardError => e2 + nack_failed = true + @logger.error( + "nack failed for msg_id=#{msg.msg_id}: " \ + "#{e2.class}: #{e2.message}", + ) + end + end + end + nack_failed + end + + def wait_for_notify_or_stop(conn) + drained = false + while conn.notifies + drained = true + end + return if drained + + deadline = monotonic + @poll_interval + while running? + remaining = deadline - monotonic + return if remaining <= 0 + + slice = [WAIT_SLICE_SECONDS, remaining].min + notification = conn.wait_for_notify(slice) + return unless running? + + if notification + while conn.notifies + # drain any queued notifications + end + return + end + end + end + + def monotonic + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + # The default logger is effectively silent: it targets $stderr (so + # messages never collide with application stdout) and ships at level + # FATAL, which the consumer never emits. Set PGQUE_LOG_LEVEL=warn (or + # info, debug, error) to see warnings/info from the consumer, or + # pass logger: Logger.new(...) to Consumer.new for full control. + def default_logger + log = Logger.new($stderr) + log.progname = "pgque.consumer.#{@name}" + log.level = env_log_level || Logger::FATAL + log + end + + def env_log_level + raw = ENV["PGQUE_LOG_LEVEL"] + return nil if raw.nil? + + normalized = raw.strip.upcase + return nil if normalized.empty? + + Logger.const_get(normalized) + rescue NameError + nil + end + end +end diff --git a/clients/ruby/lib/pgque/errors.rb b/clients/ruby/lib/pgque/errors.rb new file mode 100644 index 0000000..0db8cde --- /dev/null +++ b/clients/ruby/lib/pgque/errors.rb @@ -0,0 +1,13 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +module Pgque + class Error < StandardError; end + + class ConnectionError < Error; end + + class QueueNotFound < Error; end + + class BatchNotFound < Error; end + + class ConsumerNotFound < Error; end +end diff --git a/clients/ruby/lib/pgque/event.rb b/clients/ruby/lib/pgque/event.rb new file mode 100644 index 0000000..4e78a63 --- /dev/null +++ b/clients/ruby/lib/pgque/event.rb @@ -0,0 +1,13 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +module Pgque + class Event + attr_reader :payload, :type, :extra + + def initialize(payload:, type: "default", extra: {}) + @payload = payload + @type = type + @extra = extra + end + end +end diff --git a/clients/ruby/lib/pgque/message.rb b/clients/ruby/lib/pgque/message.rb new file mode 100644 index 0000000..d5a299c --- /dev/null +++ b/clients/ruby/lib/pgque/message.rb @@ -0,0 +1,23 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +module Pgque + class Message + attr_reader :msg_id, :batch_id, :type, :payload, :retry_count, + :created_at, :extra1, :extra2, :extra3, :extra4 + + def initialize(msg_id:, batch_id:, type:, payload:, retry_count:, + created_at:, extra1: nil, extra2: nil, extra3: nil, + extra4: nil) + @msg_id = msg_id + @batch_id = batch_id + @type = type + @payload = payload + @retry_count = retry_count + @created_at = created_at + @extra1 = extra1 + @extra2 = extra2 + @extra3 = extra3 + @extra4 = extra4 + end + end +end diff --git a/clients/ruby/lib/pgque/version.rb b/clients/ruby/lib/pgque/version.rb new file mode 100644 index 0000000..9ce00d7 --- /dev/null +++ b/clients/ruby/lib/pgque/version.rb @@ -0,0 +1,5 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +module Pgque + VERSION = "0.3.0.rc.1" +end diff --git a/clients/ruby/pgque.gemspec b/clients/ruby/pgque.gemspec new file mode 100644 index 0000000..8672525 --- /dev/null +++ b/clients/ruby/pgque.gemspec @@ -0,0 +1,33 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +require_relative "lib/pgque/version" + +Gem::Specification.new do |spec| + spec.name = "pgque" + spec.version = Pgque::VERSION + spec.authors = ["Nikolay Samokhvalov", "Dalto Curvelano Jr"] + spec.email = ["nik@postgres.ai", "daltojr@gmail.com"] + + spec.summary = "Ruby client for PgQue -- PgQ Universal Edition" + spec.description = "Thin Ruby wrapper over the pgque SQL API: send, " \ + "send_batch, receive, ack, nack, force_next_tick, " \ + "plus a polling Consumer with LISTEN/NOTIFY wakeup." + spec.homepage = "https://github.com/NikolayS/pgque" + spec.license = "Apache-2.0" + spec.required_ruby_version = ">= 3.1.0" + + spec.metadata["homepage_uri"] = spec.homepage + spec.metadata["source_code_uri"] = spec.homepage + spec.metadata["bug_tracker_uri"] = "#{spec.homepage}/issues" + spec.metadata["documentation_uri"] = "#{spec.homepage}/blob/main/docs/reference.md" + spec.metadata["changelog_uri"] = "#{spec.homepage}/releases" + + spec.files = Dir.glob("lib/**/*.rb") + + %w[README.md LICENSE].select { |f| File.exist?(f) } + spec.require_paths = ["lib"] + + spec.add_dependency "pg", ">= 1.5", "< 2.0" + + spec.add_development_dependency "minitest", "~> 5.0" + spec.add_development_dependency "rake", "~> 13.0" +end diff --git a/clients/ruby/test/test_concurrency.rb b/clients/ruby/test/test_concurrency.rb new file mode 100644 index 0000000..8d35507 --- /dev/null +++ b/clients/ruby/test/test_concurrency.rb @@ -0,0 +1,36 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +require_relative "test_helper" + +class TestConcurrency < Minitest::Test + include PgqueTest::Helpers + + def test_concurrent_producers_no_id_collisions + with_queue do |queue, _consumer, _conn| + n_threads = 4 + per_thread = 25 + seen_ids = [] + seen_lock = Mutex.new + + threads = n_threads.times.map do + Thread.new do + ids = [] + Pgque.connect(dsn) do |client| + per_thread.times do |i| + ids << client.send( + queue, + { "thread" => Thread.current.object_id, "i" => i }, + ) + end + end + seen_lock.synchronize { seen_ids.concat(ids) } + end + end + threads.each { |t| refute_nil t.join(30), "producer thread hung" } + + assert_equal n_threads * per_thread, seen_ids.size + assert_equal seen_ids.size, seen_ids.uniq.size, + "duplicate event ids: #{seen_ids - seen_ids.uniq}" + end + end +end diff --git a/clients/ruby/test/test_connect.rb b/clients/ruby/test/test_connect.rb new file mode 100644 index 0000000..217a431 --- /dev/null +++ b/clients/ruby/test/test_connect.rb @@ -0,0 +1,61 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +require_relative "test_helper" + +class TestConnect < Minitest::Test + include PgqueTest::Helpers + + def test_connect_returns_client + client = Pgque.connect(dsn) + assert_instance_of Pgque::Client, client + refute client.conn.finished? + client.close + assert client.conn.finished? + end + + def test_connect_block_form_closes_on_exit + captured = nil + Pgque.connect(dsn) do |client| + captured = client + refute client.conn.finished? + end + assert captured.conn.finished? + end + + def test_external_conn_is_not_closed_by_close + raw = PG.connect(dsn) + begin + client = Pgque::Client.new(raw) + client.close + refute raw.finished? + ensure + raw.close + end + end + + def test_close_is_idempotent + client = Pgque.connect(dsn) + client.close + client.close + end + + def test_underscore_send_dispatches_methods_reflectively + # Pgque::Client#send shadows Object#send; __send__ and public_send + # remain the way to invoke methods reflectively on a client. + client = Pgque.connect(dsn) + client.__send__(:close) + assert client.conn.finished? + + client2 = Pgque.connect(dsn) + client2.public_send(:close) + assert client2.conn.finished? + end +end + +class TestConnectBadDsn < Minitest::Test + def test_connect_bad_dsn_raises_pgque_connection_error + assert_raises(Pgque::ConnectionError) do + Pgque.connect("postgresql://nobody:wrong@localhost:1/nonexistent_db_xyz") + end + end +end diff --git a/clients/ruby/test/test_consumer.rb b/clients/ruby/test/test_consumer.rb new file mode 100644 index 0000000..bedf327 --- /dev/null +++ b/clients/ruby/test/test_consumer.rb @@ -0,0 +1,392 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +require_relative "test_helper" +require "logger" +require "stringio" + +class TestConsumerUnit < Minitest::Test + include PgqueTest::Helpers + + class FakeTxConn + def transaction + yield self + end + end + + class SpyClient + attr_reader :receive_calls + + def initialize(*) + @receive_calls = [] + end + + def receive(queue, consumer, max_messages) + @receive_calls << [queue, consumer, max_messages] + [] + end + + def ack(_batch_id) + 1 + end + + def nack(*); end + end + + def test_consumer_default_max_messages_requests_whole_batch + cons = Pgque::Consumer.new(dsn, queue: "q", name: "c") + assert_equal 2_147_483_647, cons.max_messages + end + + def test_consumer_configured_max_messages_is_preserved + cons = Pgque::Consumer.new(dsn, queue: "q", name: "c", max_messages: 123) + assert_equal 123, cons.max_messages + end + + def test_consumer_poll_once_passes_default_max_messages + cons = Pgque::Consumer.new(dsn, queue: "q", name: "c") + spy = SpyClient.new + Pgque::Client.stub :new, ->(*) { spy } do + cons.poll_once(FakeTxConn.new) + end + assert_equal [["q", "c", 2_147_483_647]], spy.receive_calls + end + + def test_consumer_poll_once_passes_configured_max_messages + cons = Pgque::Consumer.new(dsn, queue: "q", name: "c", max_messages: 123) + spy = SpyClient.new + Pgque::Client.stub :new, ->(*) { spy } do + cons.poll_once(FakeTxConn.new) + end + assert_equal [["q", "c", 123]], spy.receive_calls + end + + def test_consumer_rejects_invalid_unknown_handler_policy + skip_dsn_for_this_class! + assert_raises(ArgumentError) do + Pgque::Consumer.new("dummy", queue: "q", name: "c", + unknown_handler_policy: "bogus") + end + end + + def test_consumer_dead_interval_without_subconsumer_raises + skip_dsn_for_this_class! + assert_raises(ArgumentError) do + Pgque::Consumer.new("dummy", queue: "q", name: "c", + dead_interval: "5 minutes") + end + end + + def test_running_clears_after_start_failure + skip_dsn_for_this_class! + # Point at an unreachable port so PG.connect raises immediately + # inside start, exiting before the poll loop ever runs. + cons = Pgque::Consumer.new( + "postgresql://nobody:wrong@localhost:1/nodb", + queue: "q", name: "c", logger: silent_logger + ) + refute cons.running? + assert_raises(PG::Error) { cons.start } + refute cons.running?, + "consumer.running? must be false after start raised" + end + + def silent_logger + require "logger" + require "stringio" + log = Logger.new(StringIO.new) + log.level = Logger::FATAL + log + end + + def test_invalid_pgque_log_level_falls_back_to_fatal + skip_dsn_for_this_class! + old = ENV["PGQUE_LOG_LEVEL"] + ENV["PGQUE_LOG_LEVEL"] = " warning " + + cons = Pgque::Consumer.new("dummy", queue: "q", name: "c") + assert_equal Logger::FATAL, cons.logger.level + ensure + ENV["PGQUE_LOG_LEVEL"] = old + end + + private + + # Some unit tests don't actually connect; allow them even without DSN. + def skip_dsn_for_this_class! + # The setup-level skip already passes when DSN is set; this method is + # here so the structure is symmetric with the integration tests. + end +end + +class TestConsumerIntegration < Minitest::Test + include PgqueTest::Helpers + + def run_consumer_for(consumer, seconds) + t = Thread.new { consumer.start } + Thread.new do + sleep seconds + consumer.stop + end + t + end + + def force_tick(conn, queue) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + end + + def silent_logger + log = Logger.new(StringIO.new) + log.level = Logger::FATAL + log + end + + def capturing_logger + io = StringIO.new + log = Logger.new(io) + log.level = Logger::WARN + [log, io] + end + + def retry_count_for_msg(conn, queue, msg_id) + conn.exec_params( + "select count(*) from pgque.retry_queue rq " \ + "join pgque.queue q on q.queue_id = rq.ev_queue " \ + "where q.queue_name = $1 and rq.ev_id = $2", + [queue, msg_id], + ).values[0][0].to_i + end + + def dlq_count_for_msg(conn, queue, msg_id) + conn.exec_params( + "select count(*) from pgque.dead_letter dl " \ + "join pgque.queue q on q.queue_id = dl.dl_queue_id " \ + "where q.queue_name = $1 and dl.ev_id = $2", + [queue, msg_id], + ).values[0][0].to_i + end + + def test_consumer_dispatches_by_event_type + with_queue do |queue, consumer_n, conn| + client = Pgque::Client.new(conn) + client.send(queue, { "i" => 1 }, type: "evt.a") + client.send(queue, { "i" => 2 }, type: "evt.b") + force_tick(conn, queue) + + seen_a = [] + seen_b = [] + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 1, logger: silent_logger) + cons.on("evt.a") { |m| seen_a << m.payload } + cons.on("evt.b") { |m| seen_b << m.payload } + + run_consumer_for(cons, 3.0).join(5.0) + + assert_equal 1, seen_a.size + assert_equal 1, seen_b.size + end + end + + def test_consumer_default_handler_catches_unknown + with_queue do |queue, consumer_n, conn| + client = Pgque::Client.new(conn) + client.send(queue, { "x" => 99 }, type: "never.registered.type") + force_tick(conn, queue) + + fallback = [] + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 1, logger: silent_logger) + cons.on("*") { |m| fallback << m } + + run_consumer_for(cons, 3.0).join(5.0) + + assert_equal 1, fallback.size + assert_equal "never.registered.type", fallback[0].type + end + end + + def test_consumer_nacks_on_handler_error + with_queue do |queue, consumer_n, conn| + client = Pgque::Client.new(conn) + client.send(queue, { "i" => 1 }, type: "evt.fail") + force_tick(conn, queue) + + n_calls = 0 + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 1, retry_after: 0, + logger: silent_logger) + cons.on("evt.fail") { |_m| n_calls += 1; raise "simulated failure" } + + run_consumer_for(cons, 3.0).join(5.0) + + assert_operator n_calls, :>=, 1 + cnt = conn.exec_params( + "select count(*) from pgque.retry_queue rq " \ + "join pgque.queue q on q.queue_id = rq.ev_queue " \ + "where q.queue_name = $1", + [queue], + ).values[0][0].to_i + assert_operator cnt, :>=, 1 + end + end + + def test_consumer_nacks_unhandled_event_type + with_queue do |queue, consumer_n, conn| + client = Pgque::Client.new(conn) + msg_id = client.send(queue, { "x" => 1 }, type: "totally.unregistered.type") + force_tick(conn, queue) + + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 1, logger: silent_logger) + run_consumer_for(cons, 3.0).join(5.0) + + rq = retry_count_for_msg(conn, queue, msg_id) + dlq = dlq_count_for_msg(conn, queue, msg_id) + assert_operator rq + dlq, :>=, 1, + "unhandled event was not nacked: rq=#{rq} dlq=#{dlq}" + + force_tick(conn, queue) + follow = client.receive(queue, consumer_n, 10) + refute(follow.any? { |m| m.msg_id == msg_id }, + "batch did not advance past unhandled msg_id") + client.ack(follow[0].batch_id) if follow.any? + end + end + + def test_consumer_acks_unhandled_event_type_when_opt_in + with_queue do |queue, consumer_n, conn| + client = Pgque::Client.new(conn) + msg_id = client.send(queue, { "x" => 1 }, type: "totally.unregistered.type") + force_tick(conn, queue) + + log, io = capturing_logger + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 1, + unknown_handler_policy: "ack", + logger: log) + run_consumer_for(cons, 3.0).join(5.0) + + assert_equal 0, retry_count_for_msg(conn, queue, msg_id) + assert_equal 0, dlq_count_for_msg(conn, queue, msg_id) + assert_includes io.string, "totally.unregistered.type" + + force_tick(conn, queue) + follow = client.receive(queue, consumer_n, 10) + refute(follow.any? { |m| m.msg_id == msg_id }) + client.ack(follow[0].batch_id) if follow.any? + end + end + + def test_consumer_stop_returns_promptly + with_queue do |queue, consumer_n, _conn| + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 10, logger: silent_logger) + t = Thread.new { cons.start } + sleep 0.5 + cons.stop + finished = t.join(15) + refute_nil finished, "consumer did not stop after stop()" + end + end + + def test_consumer_stop_returns_within_2s_while_waiting + with_queue do |queue, consumer_n, _conn| + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 30, logger: silent_logger) + t = Thread.new { cons.start } + sleep 1.0 + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + cons.stop + t.join(5.0) + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + + refute t.alive?, "consumer thread did not stop" + assert_operator elapsed, :<, 2.0, + "stop() took #{elapsed.round(2)}s; expected <2s" + end + end + + def test_consumer_wakes_on_pg_notify_before_poll_interval + with_queue do |queue, consumer_n, conn| + received = [] + received_evt = Mutex.new + received_cv = ConditionVariable.new + + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 30, logger: silent_logger) + cons.on("evt.wake") do |m| + received_evt.synchronize do + received << m + received_cv.signal + end + end + + t = Thread.new { cons.start } + sleep 1.5 + + t_send = Process.clock_gettime(Process::CLOCK_MONOTONIC) + producer = PG.connect(dsn) + begin + client = Pgque::Client.new(producer) + client.send(queue, { "i" => 1 }, type: "evt.wake") + producer.exec_params("select pgque.force_next_tick($1)", [queue]) + producer.exec_params("select pgque.ticker($1)", [queue]) + producer.exec_params("notify pgque_#{queue}, 'go'") + ensure + producer.close + end + + woke = false + received_evt.synchronize do + deadline = Time.now + 5 + until received.any? || Time.now >= deadline + received_cv.wait(received_evt, deadline - Time.now) + end + woke = received.any? + end + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t_send + + cons.stop + t.join(5.0) + + assert woke, "consumer did not wake on pg_notify within 5s" + assert_equal 1, received.size + assert_operator elapsed, :<, 5.0, + "consumer woke too slowly (#{elapsed.round(2)}s)" + end + end + + def test_consumer_partial_batch_acks_good_messages_only + with_queue do |queue, consumer_n, conn| + client = Pgque::Client.new(conn) + ok1 = client.send(queue, { "i" => 1 }, type: "ok") + boom = client.send(queue, { "i" => 2 }, type: "boom") + ok2 = client.send(queue, { "i" => 3 }, type: "ok") + force_tick(conn, queue) + + seen_ok = [] + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 1, retry_after: 3600, + logger: silent_logger) + cons.on("ok") { |m| seen_ok << m.msg_id } + cons.on("boom") { |_| raise "handler boom" } + + run_consumer_for(cons, 3.0).join(5.0) + + assert_includes seen_ok, ok1 + assert_includes seen_ok, ok2 + + assert_operator retry_count_for_msg(conn, queue, boom), :>=, 1 + assert_equal 0, retry_count_for_msg(conn, queue, ok1) + assert_equal 0, retry_count_for_msg(conn, queue, ok2) + + force_tick(conn, queue) + follow = client.receive(queue, consumer_n, 10) + ids = follow.map(&:msg_id) + refute_includes ids, ok1 + refute_includes ids, ok2 + client.ack(follow[0].batch_id) if follow.any? + end + end +end diff --git a/clients/ruby/test/test_consumer_listen_stop.rb b/clients/ruby/test/test_consumer_listen_stop.rb new file mode 100644 index 0000000..85d7167 --- /dev/null +++ b/clients/ruby/test/test_consumer_listen_stop.rb @@ -0,0 +1,84 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +# Regression tests for the LISTEN/NOTIFY wait: stop() must take effect +# promptly, and a real NOTIFY must wake the wait well before +# poll_interval expires. + +require_relative "test_helper" +require "logger" +require "stringio" + +class TestConsumerListenStop < Minitest::Test + include PgqueTest::Helpers + + def silent_logger + log = Logger.new(StringIO.new) + log.level = Logger::FATAL + log + end + + def test_stop_is_honored_promptly + with_queue do |queue, consumer_n, _conn| + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 10, logger: silent_logger) + t = Thread.new { cons.start } + sleep 1.0 + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + cons.stop + t.join(3.5) + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + + refute t.alive?, + "consumer thread still alive #{elapsed.round(2)}s after stop()" + assert_operator elapsed, :<, 3.0, + "stop() took #{elapsed.round(2)}s; expected <3s" + end + end + + def test_notify_wakes_consumer_before_poll_interval + with_queue do |queue, consumer_n, _conn| + seen = [] + handler_called = Mutex.new + handler_cv = ConditionVariable.new + + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 10, logger: silent_logger) + cons.on("evt.wake") do |m| + handler_called.synchronize do + seen << m.payload + handler_cv.signal + end + end + + t = Thread.new { cons.start } + begin + sleep 1.0 + + producer = PG.connect(dsn) + begin + client = Pgque::Client.new(producer) + client.send(queue, { "v" => 1 }, type: "evt.wake") + producer.exec_params("select pgque.force_next_tick($1)", [queue]) + producer.exec_params("select pgque.ticker($1)", [queue]) + ensure + producer.close + end + + woke = false + handler_called.synchronize do + deadline = Time.now + 3.0 + until seen.any? || Time.now >= deadline + handler_cv.wait(handler_called, deadline - Time.now) + end + woke = seen.any? + end + assert woke, "handler not invoked within 3s; NOTIFY did not wake" + assert_equal 1, seen.size + ensure + cons.stop + t.join(3.0) + end + end + end +end diff --git a/clients/ruby/test/test_coop.rb b/clients/ruby/test/test_coop.rb new file mode 100644 index 0000000..edd5ad8 --- /dev/null +++ b/clients/ruby/test/test_coop.rb @@ -0,0 +1,245 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +# Experimental cooperative-consumers API. Function names, edge-case +# behavior, and client API shape may change before this feature is +# marked stable. + +require_relative "test_helper" +require "logger" +require "stringio" + +module CoopHelpers + def with_coop_queue + conn = PG.connect(PGQUE_TEST_DSN) + q = unique_queue_name + conn.exec_params("select pgque.create_queue($1)", [q]) + yield q, conn + ensure + if conn && !conn.finished? + # Reset failed transaction state before cleanup, mirroring + # with_queue. Otherwise a test that leaves the connection in + # PQTRANS_INERROR will make unsubscribe/drop fail and leak state. + conn.exec("ROLLBACK") rescue nil + begin + rows = conn.exec_params( + "select c.co_name from pgque.consumer c " \ + "join pgque.subscription s on s.sub_consumer = c.co_id " \ + "join pgque.queue qq on qq.queue_id = s.sub_queue " \ + "where qq.queue_name = $1 and s.sub_role = 'coop_member'", + [q], + ).values.map { |r| r[0] } + rows.each do |co_name| + parent, sep, sub = co_name.rpartition(".") + next if sep.empty? || parent.empty? || sub.empty? + conn.exec_params( + "select pgque.unsubscribe_subconsumer($1, $2, $3, 1)", + [q, parent, sub], + ) + end + conn.exec_params("select pgque.drop_queue($1, true)", [q]) + rescue PG::Error + # best-effort cleanup + end + conn.close + end + end + + def tick(conn, queue) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + end + + def silent_logger + log = Logger.new(StringIO.new) + log.level = Logger::FATAL + log + end +end + +class TestCoop < Minitest::Test + include PgqueTest::Helpers + include CoopHelpers + + def consumer_n + @consumer_n ||= unique_consumer_name + end + + def test_subscribe_subconsumer_returns_1_then_0 + with_coop_queue do |q, conn| + client = Pgque::Client.new(conn) + first = client.subscribe_subconsumer(q, consumer_n, "worker-1") + second = client.subscribe_subconsumer(q, consumer_n, "worker-1") + assert_equal 1, first + assert_equal 0, second + end + end + + def test_receive_coop_returns_messages_and_ack_finishes + with_coop_queue do |q, conn| + client = Pgque::Client.new(conn) + client.subscribe_subconsumer(q, consumer_n, "worker-1") + client.send(q, { "k" => 1 }, type: "evt.a") + client.send(q, { "k" => 2 }, type: "evt.a") + tick(conn, q) + + msgs = client.receive_coop(q, consumer_n, "worker-1", max_messages: 10) + assert_equal 2, msgs.size + ks = msgs.map { |m| m.payload["k"] }.sort + assert_equal [1, 2], ks + + client.ack(msgs[0].batch_id) + + follow = client.receive_coop(q, consumer_n, "worker-1", max_messages: 10) + assert_equal [], follow + end + end + + def test_two_subconsumers_split_batches_no_duplicates + with_coop_queue do |q, conn| + Pgque.connect(dsn) do |producer| + producer.subscribe_subconsumer(q, consumer_n, "worker-1") + producer.subscribe_subconsumer(q, consumer_n, "worker-2") + 6.times { |i| producer.send(q, { "i" => i }, type: "evt") } + producer.conn.exec_params("select pgque.force_next_tick($1)", [q]) + producer.conn.exec_params("select pgque.ticker($1)", [q]) + end + + # Ruby pg runs each exec_params as its own implicit transaction, so + # the FOR UPDATE lock taken by receive_coop drops as soon as the + # call returns -- no autocommit flag needed (cf. psycopg's + # autocommit=True in the Python equivalent test). + Pgque.connect(dsn) do |c1| + Pgque.connect(dsn) do |c2| + m1 = c1.receive_coop(q, consumer_n, "worker-1", max_messages: 100) + m2 = c2.receive_coop(q, consumer_n, "worker-2", max_messages: 100) + + ids1 = m1.map(&:msg_id) + ids2 = m2.map(&:msg_id) + assert_empty ids1 & ids2, + "member-1 and member-2 saw same msg_ids: #{ids1 & ids2}" + assert_operator m1.size + m2.size, :>=, 1 + + c1.ack(m1[0].batch_id) if m1.any? + c2.ack(m2[0].batch_id) if m2.any? + end + end + + Pgque.connect(dsn) do |cleanup| + cleanup.unsubscribe_subconsumer(q, consumer_n, "worker-1", + batch_handling: 1) + cleanup.unsubscribe_subconsumer(q, consumer_n, "worker-2", + batch_handling: 1) + end + end + end + + def test_unsubscribe_subconsumer_with_active_batch_default_raises + with_coop_queue do |q, conn| + client = Pgque::Client.new(conn) + client.subscribe_subconsumer(q, consumer_n, "worker-1") + client.send(q, { "i" => 1 }, type: "evt") + tick(conn, q) + + msgs = client.receive_coop(q, consumer_n, "worker-1") + assert_equal 1, msgs.size + + assert_raises(Pgque::Error) do + client.unsubscribe_subconsumer(q, consumer_n, "worker-1") + end + conn.exec("rollback") rescue nil + + rv = client.unsubscribe_subconsumer(q, consumer_n, "worker-1", + batch_handling: 1) + assert_equal 1, rv + end + end + + def test_unsubscribe_subconsumer_routes_active_messages_through_retry + with_coop_queue do |q, conn| + client = Pgque::Client.new(conn) + client.subscribe_subconsumer(q, consumer_n, "worker-1") + client.send(q, { "i" => 1 }, type: "evt") + tick(conn, q) + + msgs = client.receive_coop(q, consumer_n, "worker-1") + assert_equal 1, msgs.size + + rv = client.unsubscribe_subconsumer(q, consumer_n, "worker-1", + batch_handling: 1) + assert_equal 1, rv + end + end + + def test_touch_subconsumer_returns_1_on_registered_row + with_coop_queue do |q, conn| + client = Pgque::Client.new(conn) + client.subscribe_subconsumer(q, consumer_n, "worker-1") + rv = client.touch_subconsumer(q, consumer_n, "worker-1") + assert_equal 1, rv + end + end + + def test_consumer_coop_dispatches_and_acks + with_coop_queue do |q, conn| + client = Pgque::Client.new(conn) + client.subscribe_subconsumer(q, consumer_n, "worker-1") + msg_id = client.send(q, { "x" => 1 }, type: "evt.coop") + tick(conn, q) + + seen = [] + cons = Pgque::Consumer.new(dsn, + queue: q, name: consumer_n, + subconsumer: "worker-1", + poll_interval: 1, + logger: silent_logger) + cons.on("evt.coop") { |m| seen << m } + + t = Thread.new { cons.start } + Thread.new do + sleep 3.0 + cons.stop + end + t.join(5.0) + + assert_equal 1, seen.size + assert_equal msg_id, seen[0].msg_id + + follow = client.receive_coop(q, consumer_n, "worker-1") + assert_equal [], follow + + client.unsubscribe_subconsumer(q, consumer_n, "worker-1", + batch_handling: 1) + end + end + + def test_consumer_without_subconsumer_unchanged + with_queue do |queue, c_name, conn| + client = Pgque::Client.new(conn) + client.send(queue, { "v" => 1 }, type: "evt.normal") + tick(conn, queue) + + seen = [] + cons = Pgque::Consumer.new(dsn, + queue: queue, name: c_name, + poll_interval: 1, + logger: silent_logger) + cons.on("evt.normal") { |m| seen << m } + + t = Thread.new { cons.start } + Thread.new do + sleep 3.0 + cons.stop + end + t.join(5.0) + + assert_equal 1, seen.size + end + end + + def test_consumer_dead_interval_without_subconsumer_raises + assert_raises(ArgumentError) do + Pgque::Consumer.new(dsn, queue: "q", name: "c", + dead_interval: "5 minutes") + end + end +end diff --git a/clients/ruby/test/test_helper.rb b/clients/ruby/test/test_helper.rb new file mode 100644 index 0000000..df1efa6 --- /dev/null +++ b/clients/ruby/test/test_helper.rb @@ -0,0 +1,56 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +require "minitest/autorun" +require "securerandom" +require "pgque" + +PGQUE_TEST_DSN = ENV["PGQUE_TEST_DSN"] + +module PgqueTest + module Helpers + def setup + skip "PGQUE_TEST_DSN not set" unless PGQUE_TEST_DSN + super if defined?(super) + end + + def dsn + PGQUE_TEST_DSN + end + + def unique_queue_name + base = name.to_s.gsub(/[^a-z0-9_]/i, "_") + "rbt_#{base[0, 40]}_#{SecureRandom.hex(4)}" + end + + def unique_consumer_name + base = name.to_s.gsub(/[^a-z0-9_]/i, "_") + "rbt_c_#{base[0, 38]}_#{SecureRandom.hex(4)}" + end + + def with_queue + conn = PG.connect(PGQUE_TEST_DSN) + q = unique_queue_name + c = unique_consumer_name + conn.exec_params("select pgque.create_queue($1)", [q]) + conn.exec_params("select pgque.register_consumer($1, $2)", [q, c]) + yield q, c, conn + ensure + if conn && !conn.finished? + # Reset the connection's transaction state before cleaning up. + # If the test body left the conn in a failed transaction (an + # in-flight assertion failure after a SQL error, for example) + # any subsequent query is rejected until the transaction is + # rolled back -- which would silently break drop_queue and leak + # the test queue across runs. + conn.exec("ROLLBACK") rescue nil + begin + conn.exec_params("select pgque.unregister_consumer($1, $2)", [q, c]) if q && c + conn.exec_params("select pgque.drop_queue($1, true)", [q]) if q + rescue PG::Error + # cleanup is best-effort + end + conn.close + end + end + end +end diff --git a/clients/ruby/test/test_nack.rb b/clients/ruby/test/test_nack.rb new file mode 100644 index 0000000..55502de --- /dev/null +++ b/clients/ruby/test/test_nack.rb @@ -0,0 +1,75 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +require_relative "test_helper" + +class TestNack < Minitest::Test + include PgqueTest::Helpers + + def enqueue_and_receive(client, queue, consumer, payload, conn) + client.send(queue, payload) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + msgs = client.receive(queue, consumer, 10) + assert_equal 1, msgs.size + msgs[0] + end + + def test_nack_routes_to_retry_queue + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + msg = enqueue_and_receive(client, queue, consumer, { "k" => "retry" }, conn) + client.nack(msg.batch_id, msg, retry_after: 0) + client.ack(msg.batch_id) + + retry_count = conn.exec_params( + "select count(*) from pgque.retry_queue rq " \ + "join pgque.queue q on q.queue_id = rq.ev_queue " \ + "where q.queue_name = $1", + [queue], + ).values[0][0].to_i + assert_equal 1, retry_count + + dlq_count = conn.exec_params( + "select count(*) from pgque.dead_letter dl " \ + "join pgque.queue q on q.queue_id = dl.dl_queue_id " \ + "where q.queue_name = $1", + [queue], + ).values[0][0].to_i + assert_equal 0, dlq_count + end + end + + def test_nack_routes_to_dlq_at_max_retries + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + conn.exec_params( + "update pgque.queue set queue_max_retries = 0 where queue_name = $1", + [queue], + ) + + msg = enqueue_and_receive(client, queue, consumer, { "k" => "doomed" }, conn) + client.nack(msg.batch_id, msg, retry_after: 0, reason: "poison pill") + client.ack(msg.batch_id) + + dlq_count = conn.exec_params( + "select count(*) from pgque.dead_letter dl " \ + "join pgque.queue q on q.queue_id = dl.dl_queue_id " \ + "where q.queue_name = $1", + [queue], + ).values[0][0].to_i + assert_equal 1, dlq_count + end + end + + def test_nack_invalid_batch_raises + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + msg = enqueue_and_receive(client, queue, consumer, { "x" => 1 }, conn) + client.ack(msg.batch_id) + + assert_raises(Pgque::Error) do + client.nack(msg.batch_id, msg, retry_after: 0) + end + end + end +end diff --git a/clients/ruby/test/test_receive.rb b/clients/ruby/test/test_receive.rb new file mode 100644 index 0000000..b0b9a77 --- /dev/null +++ b/clients/ruby/test/test_receive.rb @@ -0,0 +1,89 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +require_relative "test_helper" + +class TestReceive < Minitest::Test + include PgqueTest::Helpers + + def test_receive_empty_when_no_tick + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + client.send(queue, { "a" => 1 }) + msgs = client.receive(queue, consumer, 10) + assert_equal [], msgs + end + end + + def test_receive_returns_messages_after_tick + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + client.send(queue, { "key" => "value" }) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + msgs = client.receive(queue, consumer, 10) + assert_equal 1, msgs.size + m = msgs[0] + refute_nil m.batch_id + refute_nil m.msg_id + assert_equal "default", m.type + assert_equal({ "key" => "value" }, m.payload) + end + end + + def test_ack_advances_position + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + client.send(queue, { "k" => 1 }) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + msgs = client.receive(queue, consumer, 10) + assert_equal 1, msgs.size + client.ack(msgs[0].batch_id) + msgs2 = client.receive(queue, consumer, 10) + assert_equal [], msgs2 + end + end + + def test_receive_returns_at_most_max_messages + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + 5.times { |i| client.send(queue, { "i" => i }) } + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + msgs = client.receive(queue, consumer, 3) + assert_equal 3, msgs.size + client.ack(msgs[0].batch_id) + end + end + + def test_receive_preserves_event_type + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + client.send(queue, { "a" => 1 }, type: "evt.alpha") + client.send(queue, { "b" => 2 }, type: "evt.beta") + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + msgs = client.receive(queue, consumer, 10) + types = msgs.map(&:type).sort + assert_equal ["evt.alpha", "evt.beta"], types + client.ack(msgs[0].batch_id) + end + end + + def test_message_timestamp_round_trip + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + before = Time.now.utc - 5 + client.send(queue, { "x" => 1 }) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + after = Time.now.utc + 5 + msgs = client.receive(queue, consumer, 10) + assert_equal 1, msgs.size + assert_kind_of Time, msgs[0].created_at + assert_operator msgs[0].created_at, :>=, before + assert_operator msgs[0].created_at, :<=, after + client.ack(msgs[0].batch_id) + end + end +end diff --git a/clients/ruby/test/test_send.rb b/clients/ruby/test/test_send.rb new file mode 100644 index 0000000..5a2342c --- /dev/null +++ b/clients/ruby/test/test_send.rb @@ -0,0 +1,220 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +require_relative "test_helper" + +class TestSend < Minitest::Test + include PgqueTest::Helpers + + def test_send_returns_int_event_id + with_queue do |queue, _consumer, conn| + client = Pgque::Client.new(conn) + eid = client.send(queue, { "order_id" => 42 }) + assert_kind_of Integer, eid + assert_operator eid, :>, 0 + end + end + + def test_send_with_explicit_type + with_queue do |queue, _consumer, conn| + client = Pgque::Client.new(conn) + eid = client.send(queue, { "id" => 1 }, type: "order.created") + assert_kind_of Integer, eid + end + end + + def test_send_event_object + with_queue do |queue, _consumer, conn| + client = Pgque::Client.new(conn) + event = Pgque::Event.new(payload: { "x" => 1 }, type: "custom.t") + eid = client.send(queue, event) + assert_kind_of Integer, eid + end + end + + def test_send_str_payload_passes_through + with_queue do |queue, _consumer, conn| + client = Pgque::Client.new(conn) + eid = client.send(queue, '"plain string"') + assert_kind_of Integer, eid + end + end + + def test_send_nil_payload + with_queue do |queue, _consumer, conn| + client = Pgque::Client.new(conn) + eid = client.send(queue, nil) + assert_kind_of Integer, eid + end + end + + def test_send_numeric_and_boolean_payloads_coerce_via_to_s + # Non-String/Hash/Array/nil payloads run through to_s so numerics + # and booleans round-trip naturally as JSON scalars. + cases = [ + [42, 42], + [3.14, 3.14], + [true, true], + [false, false], + ] + cases.each do |payload, expected| + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + client.send(queue, payload) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + msgs = client.receive(queue, consumer, 10) + assert_equal 1, msgs.size, "no message for #{payload.inspect}" + assert_equal expected, msgs[0].payload, + "#{payload.inspect} did not round-trip" + client.ack(msgs[0].batch_id) + end + end + end + + def test_send_batch_returns_ids_in_order + with_queue do |queue, _consumer, conn| + client = Pgque::Client.new(conn) + ids = client.send_batch(queue, "batch.test", [ + { "n" => 1 }, { "n" => 2 }, { "n" => 3 }, { "n" => 4 } + ]) + assert_equal 4, ids.size + assert ids.all? { |i| i.is_a?(Integer) } + assert_equal ids.sort, ids + end + end + + def test_send_unicode_payload + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + payload = { "text" => "héllo wörld 🎉 — ünicode тест" } + client.send(queue, payload) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + msgs = client.receive(queue, consumer, 10) + assert_equal 1, msgs.size + assert_equal payload, msgs[0].payload + client.ack(msgs[0].batch_id) + end + end + + def test_send_large_payload + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + big = { "data" => "x" * 100_000 } + client.send(queue, big) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + msgs = client.receive(queue, consumer, 10) + assert_equal 1, msgs.size + assert_equal big, msgs[0].payload + client.ack(msgs[0].batch_id) + end + end + + def test_jsonb_payload_round_trip + cases = [ + [{ "key" => "val", "n" => 1 }, { "key" => "val", "n" => 1 }], + [[1, "two", nil], [1, "two", nil]], + ['"just a string"', "just a string"], + ["42", 42], + ["null", nil], + ] + cases.each do |payload, expected| + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + client.send(queue, payload) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + msgs = client.receive(queue, consumer, 10) + assert_equal 1, msgs.size, "no message for payload=#{payload.inspect}" + if expected.nil? + assert_nil msgs[0].payload, + "payload=#{payload.inspect} did not round-trip to nil" + else + assert_equal expected, msgs[0].payload, + "payload=#{payload.inspect} did not round-trip" + end + client.ack(msgs[0].batch_id) + end + end + end + + def test_send_batch_mixed_payloads_preserve_order + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + payloads = [{ "a" => 1 }, nil, "42"] + expected = [{ "a" => 1 }, nil, 42] + ids = client.send_batch(queue, "batch.mixed", payloads) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + msgs = client.receive(queue, consumer, 10) + assert_equal ids, msgs.map(&:msg_id) + assert_equal expected, msgs.map(&:payload) + client.ack(msgs[0].batch_id) + end + end + + def test_send_batch_nil_payload_produces_json_null + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + client.send_batch(queue, "default", [nil]) + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + msgs = client.receive(queue, consumer, 10) + assert_equal 1, msgs.size, "send_batch([nil]) should produce 1 message" + assert_nil msgs[0].payload, "payload must be JSON null, not SQL NULL" + client.ack(msgs[0].batch_id) + end + end + + def test_send_to_missing_queue_raises + conn = PG.connect(PGQUE_TEST_DSN) + begin + client = Pgque::Client.new(conn) + assert_raises(Pgque::Error) do + client.send("does_not_exist_xyz_12345", { "x" => 1 }) + end + ensure + conn.close + end + end +end + +class TestSendSqlForm < Minitest::Test + # Capture exec_params calls without a real DB. + class FakeConn + attr_reader :sql_used, :params_used + + def exec_params(sql, params) + @sql_used = sql + @params_used = params + FakeResult.new + end + + class FakeResult + def getvalue(_row, _col) + "999" + end + end + end + + def test_2arg_form_for_default_type + [nil, "", "default"].each do |type_val| + conn = FakeConn.new + client = Pgque::Client.new(conn) + eid = client.send("q", { "x" => 1 }, type: type_val) + assert_equal 999, eid + assert_includes conn.sql_used, "send($1, $2::jsonb)" + refute_includes conn.sql_used, "send($1, $2, $3::jsonb)", + "type=#{type_val.inspect} should use 2-arg form" + end + end + + def test_3arg_form_for_custom_type + conn = FakeConn.new + client = Pgque::Client.new(conn) + eid = client.send("q", { "x" => 1 }, type: "custom") + assert_equal 999, eid + assert_includes conn.sql_used, "send($1, $2, $3::jsonb)" + end +end diff --git a/clients/ruby/test/test_smoke.rb b/clients/ruby/test/test_smoke.rb new file mode 100644 index 0000000..18ca88a --- /dev/null +++ b/clients/ruby/test/test_smoke.rb @@ -0,0 +1,33 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +require_relative "test_helper" + +class TestSmoke < Minitest::Test + include PgqueTest::Helpers + + def test_smoke_send_receive_ack + queue = unique_queue_name + consumer_n = unique_consumer_name + Pgque.connect(dsn) do |client| + client.conn.exec_params("select pgque.create_queue($1)", [queue]) + client.conn.exec_params("select pgque.subscribe($1, $2)", [queue, consumer_n]) + + begin + client.send(queue, { "hello" => "world" }, type: "smoke.test") + client.conn.exec_params("select pgque.force_next_tick($1)", [queue]) + client.conn.exec_params("select pgque.ticker($1)", [queue]) + + msgs = client.receive(queue, consumer_n, 10) + assert_equal 1, msgs.size + assert_equal "smoke.test", msgs[0].type + assert_equal({ "hello" => "world" }, msgs[0].payload) + + client.ack(msgs[0].batch_id) + ensure + client.conn.exec_params("select pgque.unregister_consumer($1, $2)", + [queue, consumer_n]) rescue nil + client.conn.exec_params("select pgque.drop_queue($1, true)", [queue]) rescue nil + end + end + end +end diff --git a/clients/ruby/test/test_subscribe.rb b/clients/ruby/test/test_subscribe.rb new file mode 100644 index 0000000..7d78626 --- /dev/null +++ b/clients/ruby/test/test_subscribe.rb @@ -0,0 +1,114 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +require_relative "test_helper" + +class TestSubscribe < Minitest::Test + include PgqueTest::Helpers + + def test_subscribe_returns_one_for_new_then_zero_for_existing + with_queue do |queue, _consumer, conn| + client = Pgque::Client.new(conn) + fresh = "#{unique_consumer_name}_sub" + begin + first = client.subscribe(queue, fresh) + assert_equal 1, first, "first subscribe must return 1 for a fresh consumer" + second = client.subscribe(queue, fresh) + assert_equal 0, second, "second subscribe must return 0 (already registered)" + ensure + conn.exec_params( + "select pgque.unregister_consumer($1, $2)", [queue, fresh] + ) rescue nil + end + end + end + + def test_unsubscribe_returns_positive_for_existing_then_zero_for_missing + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + first = client.unsubscribe(queue, consumer) + assert_operator first, :>=, 1, + "first unsubscribe of a registered consumer must return >= 1" + second = client.unsubscribe(queue, consumer) + assert_equal 0, second, + "second unsubscribe must return 0 (no longer registered)" + end + end + + def test_subscribed_consumer_can_receive_messages + with_queue do |queue, _registered, conn| + client = Pgque::Client.new(conn) + fresh = "#{unique_consumer_name}_recv" + begin + client.subscribe(queue, fresh) + client.send(queue, { "x" => 1 }, type: "sub.test") + client.force_next_tick(queue) + client.ticker(queue) + msgs = client.receive(queue, fresh, 10) + assert_equal 1, msgs.size + assert_equal "sub.test", msgs[0].type + client.ack(msgs[0].batch_id) + ensure + conn.exec_params( + "select pgque.unregister_consumer($1, $2)", [queue, fresh] + ) rescue nil + end + end + end +end + +class TestSubscribeSqlForm < Minitest::Test + # Capture exec_params calls without a real DB. + class FakeConn + attr_reader :sql_used, :params_used + + def initialize(scalar:) + @scalar = scalar + end + + def exec_params(sql, params) + @sql_used = sql + @params_used = params + FakeResult.new(@scalar) + end + + class FakeResult + def initialize(value) + @value = value + end + + def getvalue(_row, _col) + @value + end + end + end + + def test_subscribe_issues_two_arg_sql_and_returns_integer + conn = FakeConn.new(scalar: "1") + client = Pgque::Client.new(conn) + n = client.subscribe("orders", "processor") + assert_equal 1, n + assert_includes conn.sql_used, "pgque.subscribe($1, $2)" + assert_equal ["orders", "processor"], conn.params_used + end + + def test_subscribe_returns_zero_when_already_registered + conn = FakeConn.new(scalar: "0") + client = Pgque::Client.new(conn) + assert_equal 0, client.subscribe("orders", "processor") + end + + def test_unsubscribe_issues_two_arg_sql_and_returns_integer + conn = FakeConn.new(scalar: "1") + client = Pgque::Client.new(conn) + n = client.unsubscribe("orders", "processor") + assert_equal 1, n + assert_includes conn.sql_used, "pgque.unsubscribe($1, $2)" + assert_equal ["orders", "processor"], conn.params_used + end + + def test_unsubscribe_returns_zero_when_not_subscribed + conn = FakeConn.new(scalar: "0") + client = Pgque::Client.new(conn) + assert_equal 0, client.unsubscribe("orders", "processor") + end +end diff --git a/clients/ruby/test/test_ticker.rb b/clients/ruby/test/test_ticker.rb new file mode 100644 index 0000000..8ab0dad --- /dev/null +++ b/clients/ruby/test/test_ticker.rb @@ -0,0 +1,96 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +require_relative "test_helper" + +class TestTicker < Minitest::Test + include PgqueTest::Helpers + + def test_ticker_after_force_next_tick_returns_non_nil_integer + with_queue do |queue, _consumer, conn| + client = Pgque::Client.new(conn) + client.send(queue, { "x" => 1 }, type: "tick.test") + client.force_next_tick(queue) + tick_id = client.ticker(queue) + refute_nil tick_id, "ticker after force_next_tick must produce a tick" + assert_kind_of Integer, tick_id + assert_operator tick_id, :>, 0 + end + end + + def test_ticker_returns_nil_when_no_new_tick_needed + with_queue do |queue, _consumer, conn| + client = Pgque::Client.new(conn) + client.force_next_tick(queue) + client.ticker(queue) + # Immediately again with no new activity: ticker returns nil. + assert_nil client.ticker(queue), + "second ticker call with no new events must return nil" + end + end + + def test_ticker_all_returns_non_negative_integer + Pgque.connect(dsn) do |client| + n = client.ticker_all + assert_kind_of Integer, n + assert_operator n, :>=, 0 + end + end +end + +class TestTickerSqlForm < Minitest::Test + # Capture exec_params calls without a real DB. + class FakeConn + attr_reader :sql_used, :params_used + + def initialize(scalar:) + @scalar = scalar + end + + def exec_params(sql, params) + @sql_used = sql + @params_used = params + FakeResult.new(@scalar) + end + + class FakeResult + def initialize(value) + @value = value + end + + def getvalue(_row, _col) + @value + end + end + end + + def test_ticker_issues_single_queue_sql + conn = FakeConn.new(scalar: "42") + client = Pgque::Client.new(conn) + tick_id = client.ticker("orders") + assert_equal 42, tick_id + assert_includes conn.sql_used, "pgque.ticker($1)" + assert_equal ["orders"], conn.params_used + end + + def test_ticker_returns_nil_for_nil_scalar + conn = FakeConn.new(scalar: nil) + client = Pgque::Client.new(conn) + assert_nil client.ticker("orders") + end + + def test_ticker_returns_nil_for_empty_scalar + conn = FakeConn.new(scalar: "") + client = Pgque::Client.new(conn) + assert_nil client.ticker("orders") + end + + def test_ticker_all_issues_zero_arg_sql + conn = FakeConn.new(scalar: "3") + client = Pgque::Client.new(conn) + n = client.ticker_all + assert_equal 3, n + assert_includes conn.sql_used, "pgque.ticker()" + refute_includes conn.sql_used, "$1" + assert_equal [], conn.params_used + end +end diff --git a/clients/ruby/test/test_transaction_visibility.rb b/clients/ruby/test/test_transaction_visibility.rb new file mode 100644 index 0000000..cf09ae0 --- /dev/null +++ b/clients/ruby/test/test_transaction_visibility.rb @@ -0,0 +1,72 @@ +# Copyright 2026 Nikolay Samokhvalov. Apache-2.0 license. + +# PgQ snapshot isolation: events committed in transaction T are only +# visible to a batch whose tick was taken after T committed. Collapsing +# send + force_next_tick + receive into one transaction violates that +# contract. These tests document and enforce it. + +require_relative "test_helper" +require "logger" +require "stringio" + +class TestTransactionVisibility < Minitest::Test + include PgqueTest::Helpers + + def silent_logger + log = Logger.new(StringIO.new) + log.level = Logger::FATAL + log + end + + def test_collapsed_transaction_returns_no_messages + with_queue do |queue, consumer, conn| + client = Pgque::Client.new(conn) + + conn.exec("BEGIN") + begin + client.send(queue, { "x" => 1 }, type: "collapsed.test") + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + # No commit between send and receive -- one transaction. + msgs = client.receive(queue, consumer, 10) + assert_equal 0, msgs.size, + "PgQ visibility violation: collapsed transaction " \ + "returned #{msgs.size} message(s); expected 0. " \ + "Add a commit between send and force_next_tick." + ensure + conn.exec("ROLLBACK") + end + end + end + + def test_unhandled_event_nack_assertion_catches_stale_cursor + with_queue do |queue, consumer_n, conn| + client = Pgque::Client.new(conn) + msg_id = client.send(queue, { "x" => 1 }, type: "totally.unregistered.type") + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + + cons = Pgque::Consumer.new(dsn, queue: queue, name: consumer_n, + poll_interval: 1, logger: silent_logger) + # Simulate a broken consumer that receives but neither acks nor nacks. + cons.define_singleton_method(:poll_once) { |_c| } + + t = Thread.new { cons.start } + sleep 2.0 + cons.stop + t.join(4.0) + + conn.exec_params("select pgque.force_next_tick($1)", [queue]) + conn.exec_params("select pgque.ticker($1)", [queue]) + follow = client.receive(queue, consumer_n, 10) + + assert(follow.any? { |m| m.msg_id == msg_id }, + "expected the unprocessed message to still be visible " \ + "(cursor did not advance because poll_once was a no-op), but " \ + "re-receive returned no rows. This indicates the batch cursor " \ + "advanced without an explicit ack -- a PgQ visibility violation.") + + client.ack(follow[0].batch_id) if follow.any? + end + end +end