diff --git a/.gitignore b/.gitignore index b4af75d88..8c7cfc3fb 100644 --- a/.gitignore +++ b/.gitignore @@ -31,5 +31,6 @@ node_modules .supabase config/prod.secret.exs demo/.env +examples/realtime-jsonb-filter-mre/.env .lexical .vscode diff --git a/README.md b/README.md index 74d8e3f6f..31ba31263 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,12 @@ alter publication supabase_realtime add table test; You can start playing around with Broadcast, Presence, and Postgres Changes features either with the client libs (e.g. `@supabase/realtime-js`), or use the built in Realtime Inspector on localhost, `http://localhost:4000/inspector/new` (make sure the port is correct for your development environment). +## Examples + +- [Realtime JSONB Filter Limitation (MRE)](./examples/realtime-jsonb-filter-mre) – Demonstrates why Realtime filters don't work on JSONB expressions, and shows the recommended dedicated column pattern. + +## WebSocket Connection + The WebSocket URL must contain the subdomain, `external_id` of the tenant on the `_realtime.tenants` table, and the token must be signed with the `jwt_secret` that was inserted along with the tenant. If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000/socket` (make sure the port is correct for your development environment), and you can use `eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MDMwMjgwODcsInJvbGUiOiJwb3N0Z3JlcyJ9.tz_XJ89gd6bN8MBpCl7afvPrZiBH6RB65iA1FadPT3Y` for the token. The token must have `exp` and `role` (database role) keys. diff --git a/examples/realtime-jsonb-filter-mre/.env.example b/examples/realtime-jsonb-filter-mre/.env.example new file mode 100644 index 000000000..7764385c1 --- /dev/null +++ b/examples/realtime-jsonb-filter-mre/.env.example @@ -0,0 +1,2 @@ +SUPABASE_URL=https://YOUR_PROJECT.supabase.co +SUPABASE_ANON_KEY=YOUR_ANON_KEY diff --git a/examples/realtime-jsonb-filter-mre/README.md b/examples/realtime-jsonb-filter-mre/README.md new file mode 100644 index 000000000..fa7f806d7 --- /dev/null +++ b/examples/realtime-jsonb-filter-mre/README.md @@ -0,0 +1,145 @@ +# Supabase Realtime JSONB Filter Limitation (MRE) + +## Problem + +Supabase Realtime `postgres_changes` filters only support direct column filters (e.g., `column=eq.value`). **JSONB expression filters are NOT supported**. + +### Expected Behavior +When filtering on a JSONB field expression like `data->>organization_id`, events should be delivered to matching subscribers (like PostgREST filters). + +### Actual Behavior +No events are received. The filter is silently ignored or rejected because Realtime does not evaluate SQL expressions. + +## Setup Required + +Before running the demo, you MUST apply the database migration. + +Steps: +1. Open Supabase dashboard +2. Confirm the project URL matches the `[DEBUG] SUPABASE_URL` printed by the demo +3. Go to SQL Editor +4. Copy contents of migration.sql +5. Run it +6. Go to Project Settings → API → Exposed schemas and add `pgboss` + +If not applied, you will see: + +```text +[ERROR] Database schema not found +``` + +## Reproduction Steps + +1. **Clone and setup:** + ```bash + npm install + cp .env.example .env + # Add SUPABASE_URL and SUPABASE_ANON_KEY + ``` + +2. **Apply migration:** + - Copy contents of `migration.sql` + - Run in your Supabase SQL editor + +3. **Run the demo:** + ```bash + npm start + ``` + +4. **Expected output:** + ``` + [SETUP] Supabase Realtime JSONB Filter MRE + [SUBSCRIBED] Ready to receive events + [INSERT] Creating job with JSONB data... + [WAIT] Waiting 8s for realtime event... + [subscription] event received: { ... } + [RESULT] ✅ PASS: Realtime event received with direct column filter + ``` + +See [expected-output.txt](expected-output.txt) for full example. + +## Root Cause + +Realtime does not evaluate or validate SQL expressions in filters. It only supports direct column equality: +- ✅ Supported: `column_name=eq.value` +- ❌ Not supported: `data->>'key'=eq.value` +- ❌ Not supported: `array_col[0]=eq.value` + +This is by design because Supabase Realtime filters operate on logical replication (WAL) changes and do not evaluate SQL expressions like JSONB operators. Keeping the filter layer simple and performant is essential for scaling to thousands of concurrent subscriptions. + +## Solution: Dedicated Column Pattern + +Instead of filtering on JSONB expressions, mirror critical fields into dedicated scalar columns: + +1. **Add column** – `organization_id TEXT` +2. **Backfill** – Extract from JSONB: `data->>'organization_id'` +3. **Sync with trigger** – Auto-update on INSERT/UPDATE +4. **Filter on column** – Use `organization_id=eq.value` +5. **Index** – Add for performance: `CREATE INDEX idx_job_organization_id` + +### Files + +- **migration.sql** – Creates schema, column, trigger, index, RLS policy +- **subscription-client.mjs** – Realtime subscription using correct filter +- **run-demo.mjs** – Demonstrates working filtered subscription +- **expected-output.txt** – Example successful run +- **package.json** – Dependencies + +## Key Code Changes + +### ❌ Broken (JSONB filter) +```javascript +.on('postgres_changes', { + schema: 'pgboss', + table: 'job', + filter: 'data->>organization_id=eq.org_123' // Does NOT work +}) +``` + +### ✅ Fixed (direct column filter) +```javascript +.on('postgres_changes', { + schema: 'pgboss', + table: 'job', + filter: 'organization_id=eq.org_123' // Works! +}) +``` + +### Database Trigger +```sql +create trigger sync_organization_id_trigger +before insert or update on pgboss.job +for each row +execute function sync_organization_id(); +``` + +The trigger keeps `organization_id` in sync from JSONB on every write. + +## Comparison + +| Filter Type | PostgREST | Realtime | Reason | +|---|---|---|---| +| Direct column | ✅ Works | ✅ Works | Both support basic equality | +| JSONB operator | ✅ Works | ❌ Fails | Realtime doesn't evaluate SQL expressions | +| Array access | ✅ Works | ❌ Fails | Requires query evaluation | +| Function call | ✅ Works | ❌ Fails | Requires query evaluation | + +**Why the difference?** +- **PostgREST**: Query-based API that evaluates full SQL expressions +- **Realtime**: Stream-based API that applies pattern matching on WAL replication events + +## Conclusion + +Supabase Realtime is a great real-time sync engine, but it's **not a query engine**. It only supports: +- Direct column filters +- Simple comparison operators (eq, neq, gt, gte, lt, lte, like, in) +- No SQL expressions or functions + +For complex filtering on JSONB data, use the **dedicated column pattern** demonstrated here. + +## Further Reading + +- [Supabase Realtime Docs](https://supabase.com/docs/guides/realtime) +- [Realtime Filters Documentation](https://supabase.com/docs/guides/realtime#postgres_changes-schema) +- [PostgREST Filters](https://postgrest.org/en/stable/references/api/tables_views.html#operators) + diff --git a/examples/realtime-jsonb-filter-mre/expected-output.txt b/examples/realtime-jsonb-filter-mre/expected-output.txt new file mode 100644 index 000000000..3d4b90cc9 --- /dev/null +++ b/examples/realtime-jsonb-filter-mre/expected-output.txt @@ -0,0 +1,32 @@ +NOTE: +This output assumes migration.sql has already been applied. + +[SETUP] Supabase Realtime JSONB Filter MRE +[SETUP] Realtime filter used: organization_id=eq.org_123 +[DEBUG] SUPABASE_URL = https://your-project.supabase.co +[subscription] status: SUBSCRIBED +[SUBSCRIBED] Ready to receive events +[INSERT] Creating job with JSONB data: { organization_id: "org_123" } +[INSERT] Row created with ID: 5f8a7f51-c13e-4baa-9ba8-9f0ec7f53d36 +[INSERT] organization_id auto-filled: org_123 +[WAIT] Waiting 8s for realtime event... + +[subscription] event received: { + "schema": "pgboss", + "table": "job", + "eventType": "INSERT", + "new": { + "id": "5f8a7f51-c13e-4baa-9ba8-9f0ec7f53d36", + "organization_id": "org_123", + "data": { "organization_id": "org_123" }, + "created_at": "2026-04-08T12:00:00.000000" + } +} + +[RESULT] Summary: +[RESULT] Events received: 1 + +[RESULT] +[RESULT] ✅ PASS: Realtime event received with direct column filter +[RESULT] ✅ PASS: Trigger kept organization_id in sync +[RESULT] diff --git a/examples/realtime-jsonb-filter-mre/migration.sql b/examples/realtime-jsonb-filter-mre/migration.sql new file mode 100644 index 000000000..2d7836774 --- /dev/null +++ b/examples/realtime-jsonb-filter-mre/migration.sql @@ -0,0 +1,81 @@ +-- ============================================ +-- Supabase Realtime: JSONB Filter Workaround (FIXED) +-- ============================================ + +-- Extensions & Schema +create extension if not exists pgcrypto; +create schema if not exists pgboss; + +-- Table +create table if not exists pgboss.job ( + id uuid primary key default gen_random_uuid(), + data jsonb, + created_at timestamp default now() +); + +-- 1) Add scalar column for filtering +alter table pgboss.job + add column if not exists organization_id text; + +-- 2) Backfill existing rows +update pgboss.job +set organization_id = data->>'organization_id' +where organization_id is distinct from data->>'organization_id'; + +-- 3) Trigger function (schema-safe) +create or replace function pgboss.sync_organization_id() +returns trigger as $$ +begin + new.organization_id := new.data->>'organization_id'; + return new; +end; +$$ language plpgsql; + +-- 4) Trigger +drop trigger if exists sync_organization_id_trigger on pgboss.job; + +create trigger sync_organization_id_trigger +before insert or update on pgboss.job +for each row +execute function pgboss.sync_organization_id(); + +-- 5) Index +create index if not exists idx_job_organization_id +on pgboss.job (organization_id); + +-- 6) Enable RLS +alter table pgboss.job enable row level security; +alter table pgboss.job replica identity full; + +-- 7) RLS Policy (IMPORTANT) +-- Replace with your JWT structure if needed +drop policy if exists "org based access" on pgboss.job; + +create policy "org based access" +on pgboss.job +for select +using ( + organization_id = auth.jwt() ->> 'organization_id' +); + +-- (Optional: allow inserts if needed) +drop policy if exists "allow insert" on pgboss.job; + +create policy "allow insert" +on pgboss.job +for insert +with check (true); + +-- 8) Add to Realtime publication (idempotent) +do $$ +begin + if not exists ( + select 1 + from pg_publication_tables + where pubname = 'supabase_realtime' + and schemaname = 'pgboss' + and tablename = 'job' + ) then + alter publication supabase_realtime add table pgboss.job; + end if; +end $$; \ No newline at end of file diff --git a/examples/realtime-jsonb-filter-mre/package-lock.json b/examples/realtime-jsonb-filter-mre/package-lock.json new file mode 100644 index 000000000..6fb46edd9 --- /dev/null +++ b/examples/realtime-jsonb-filter-mre/package-lock.json @@ -0,0 +1,174 @@ +{ + "name": "realtime-jsonb-filter-mre", + "version": "1.0.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "realtime-jsonb-filter-mre", + "version": "1.0.0", + "dependencies": { + "@supabase/supabase-js": "^2.49.8", + "dotenv": "^16.4.5" + } + }, + "node_modules/@supabase/auth-js": { + "version": "2.102.1", + "resolved": "https://registry.npmjs.org/@supabase/auth-js/-/auth-js-2.102.1.tgz", + "integrity": "sha512-2uH2WB0H98TOGDtaFWhxIcR42Dro/VB7VDZanz/4bVJsqioIue1m3TUqu3xciDm2W9r+1LXQvYNsYbQfWmD+uQ==", + "license": "MIT", + "dependencies": { + "tslib": "2.8.1" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/@supabase/functions-js": { + "version": "2.102.1", + "resolved": "https://registry.npmjs.org/@supabase/functions-js/-/functions-js-2.102.1.tgz", + "integrity": "sha512-UcrcKTPnAIo+Yp9Jjq9XXwFbsmgRYY637mwka9ZjmTIWcX/xr1pote4OVvaGQycVY1KTiQgjMvpC0Q0yJhRq3w==", + "license": "MIT", + "dependencies": { + "tslib": "2.8.1" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/@supabase/phoenix": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/@supabase/phoenix/-/phoenix-0.4.0.tgz", + "integrity": "sha512-RHSx8bHS02xwfHdAbX5Lpbo6PXbgyf7lTaXTlwtFDPwOIw64NnVRwFAXGojHhjtVYI+PEPNSWwkL90f4agN3bw==", + "license": "MIT" + }, + "node_modules/@supabase/postgrest-js": { + "version": "2.102.1", + "resolved": "https://registry.npmjs.org/@supabase/postgrest-js/-/postgrest-js-2.102.1.tgz", + "integrity": "sha512-InLvXKAYf8BIqiv9jWOYudWB3rU8A9uMbcip5BQ5sLLNPrbO1Ekkr79OvlhZBgMNSppxVyC7wPPGzLxMcTZhlA==", + "license": "MIT", + "dependencies": { + "tslib": "2.8.1" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/@supabase/realtime-js": { + "version": "2.102.1", + "resolved": "https://registry.npmjs.org/@supabase/realtime-js/-/realtime-js-2.102.1.tgz", + "integrity": "sha512-h2fCumib/v6u7XMwSPgxnpfimjX4xCEayUHrxWLC7UurfQjUZJ0pmJDgm6yj80DnUerxuulRghwm5zXYysFG/Q==", + "license": "MIT", + "dependencies": { + "@supabase/phoenix": "^0.4.0", + "@types/ws": "^8.18.1", + "tslib": "2.8.1", + "ws": "^8.18.2" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/@supabase/storage-js": { + "version": "2.102.1", + "resolved": "https://registry.npmjs.org/@supabase/storage-js/-/storage-js-2.102.1.tgz", + "integrity": "sha512-eCL9T4Xpe40nmKlkUJ7Zq/hk34db1xPiT0WL3Iv5MbJqHuCAe5TxhV8Rjqd6DNZrzjtfYObZtYl9jKJaHrivqw==", + "license": "MIT", + "dependencies": { + "iceberg-js": "^0.8.1", + "tslib": "2.8.1" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/@supabase/supabase-js": { + "version": "2.102.1", + "resolved": "https://registry.npmjs.org/@supabase/supabase-js/-/supabase-js-2.102.1.tgz", + "integrity": "sha512-bChxPVeLDnYN9M2d/u4fXsvylwSQG5grAl+HN8f+ZD9a9PuVU+Ru+xGmEsk+b9Iz3rJC9ZQnQUJYQ28fApdWYA==", + "license": "MIT", + "dependencies": { + "@supabase/auth-js": "2.102.1", + "@supabase/functions-js": "2.102.1", + "@supabase/postgrest-js": "2.102.1", + "@supabase/realtime-js": "2.102.1", + "@supabase/storage-js": "2.102.1" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/@types/node": { + "version": "25.5.2", + "resolved": "https://registry.npmjs.org/@types/node/-/node-25.5.2.tgz", + "integrity": "sha512-tO4ZIRKNC+MDWV4qKVZe3Ql/woTnmHDr5JD8UI5hn2pwBrHEwOEMZK7WlNb5RKB6EoJ02gwmQS9OrjuFnZYdpg==", + "license": "MIT", + "dependencies": { + "undici-types": "~7.18.0" + } + }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, + "node_modules/dotenv": { + "version": "16.6.1", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.6.1.tgz", + "integrity": "sha512-uBq4egWHTcTt33a72vpSG0z3HnPuIl6NqYcTrKEg2azoEyl2hpW0zqlxysq2pK9HlDIHyHyakeYaYnSAwd8bow==", + "license": "BSD-2-Clause", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://dotenvx.com" + } + }, + "node_modules/iceberg-js": { + "version": "0.8.1", + "resolved": "https://registry.npmjs.org/iceberg-js/-/iceberg-js-0.8.1.tgz", + "integrity": "sha512-1dhVQZXhcHje7798IVM+xoo/1ZdVfzOMIc8/rgVSijRK38EDqOJoGula9N/8ZI5RD8QTxNQtK/Gozpr+qUqRRA==", + "license": "MIT", + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/tslib": { + "version": "2.8.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", + "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", + "license": "0BSD" + }, + "node_modules/undici-types": { + "version": "7.18.2", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.18.2.tgz", + "integrity": "sha512-AsuCzffGHJybSaRrmr5eHr81mwJU3kjw6M+uprWvCXiNeN9SOGwQ3Jn8jb8m3Z6izVgknn1R0FTCEAP2QrLY/w==", + "license": "MIT" + }, + "node_modules/ws": { + "version": "8.20.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz", + "integrity": "sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + } + } +} diff --git a/examples/realtime-jsonb-filter-mre/package.json b/examples/realtime-jsonb-filter-mre/package.json new file mode 100644 index 000000000..8a58abb9b --- /dev/null +++ b/examples/realtime-jsonb-filter-mre/package.json @@ -0,0 +1,14 @@ +{ + "name": "realtime-jsonb-filter-mre", + "version": "1.0.0", + "private": true, + "type": "module", + "description": "Minimal reproducible example: Supabase Realtime does not support JSONB filters", + "scripts": { + "start": "node run-demo.mjs" + }, + "dependencies": { + "@supabase/supabase-js": "2.49.8", + "dotenv": "^16.4.5" + } +} diff --git a/examples/realtime-jsonb-filter-mre/run-demo.mjs b/examples/realtime-jsonb-filter-mre/run-demo.mjs new file mode 100644 index 000000000..e9c3d6443 --- /dev/null +++ b/examples/realtime-jsonb-filter-mre/run-demo.mjs @@ -0,0 +1,125 @@ +import { subscribeToJobsByOrganization, supabase, waitForSubscribed } from './subscription-client.mjs' + +const ORG_ID = 'org_123' +const WAIT_MS = 8000 +const pause = (ms) => new Promise((r) => setTimeout(r, ms)) + +function isSchemaSetupError(error) { + const message = String(error?.message ?? error ?? '').toLowerCase() + return ( + message.includes('invalid schema') || + message.includes('schema does not exist') || + message.includes('relation does not exist') + ) +} + +function printSchemaSetupHelp() { + console.log('[ERROR] Database schema not found') + console.log('') + console.log('This demo requires the pgboss schema and job table, and pgboss must be exposed to the API.') + console.log('') + console.log('👉 Fix:') + console.log('1. Open your Supabase dashboard') + console.log('2. Confirm this project URL matches [DEBUG] SUPABASE_URL above') + console.log('3. Go to SQL Editor and run migration.sql from this project') + console.log('4. Go to Project Settings → API → Exposed schemas, add: pgboss') + console.log('5. Re-run: npm start') + console.log('') + console.log('Quick SQL check:') + console.log('select to_regclass(\'pgboss.job\');') +} + +async function main() { + console.log('[SETUP] Supabase Realtime JSONB Filter MRE') + console.log('[DEBUG] SUPABASE_URL =', process.env.SUPABASE_URL) + console.log(`[SETUP] Realtime filter used: organization_id=eq.${ORG_ID}`) + + let channel + let eventsReceived = 0 + + // Pre-check setup before subscribing/inserting. + const { error: setupCheckError } = await supabase + .schema('pgboss') + .from('job') + .select('id') + .limit(1) + + if (setupCheckError) { + if (isSchemaSetupError(setupCheckError)) { + printSchemaSetupHelp() + process.exit(1) + } + + throw new Error(`[ERROR] Setup check failed: ${setupCheckError.message}`) + } + + channel = subscribeToJobsByOrganization(ORG_ID, () => { + eventsReceived += 1 + }) + + await waitForSubscribed(channel, 'subscription') + console.log('[SUBSCRIBED] Ready to receive events') + + let inserted + + try { + console.log('[INSERT] Creating job with JSONB data: { organization_id: "org_123" }') + const { data: insertedRows, error: insertError } = await supabase + .schema('pgboss') + .from('job') + .insert({ data: { organization_id: ORG_ID } }) + .select('id, data, organization_id, created_at') + + if (insertError) throw insertError + inserted = insertedRows?.[0] + } catch (error) { + if (isSchemaSetupError(error)) { + printSchemaSetupHelp() + process.exit(1) + } + + throw new Error(`[ERROR] Insert failed: ${error.message}`) + } + + console.log(`[INSERT] Row created with ID: ${inserted.id}`) + console.log(`[INSERT] organization_id auto-filled: ${inserted.organization_id}`) + + if (!inserted || inserted.organization_id !== ORG_ID) { + throw new Error( + `[ERROR] Trigger sync failed: expected ${ORG_ID}, got ${inserted?.organization_id ?? 'null'}` + ) + } + + console.log(`[WAIT] Waiting ${WAIT_MS / 1000}s for realtime event...`) + await pause(WAIT_MS) + + console.log('') + console.log('[RESULT] Summary:') + console.log(`[RESULT] Events received: ${eventsReceived}`) + + if (eventsReceived < 1) { + console.log('[RESULT]') + console.log('[RESULT] ❌ FAIL: No realtime event received') + console.log('[RESULT]') + console.log('[RESULT] This proves: JSONB filter would NOT work (data->>organization_id)') + console.log('[RESULT] But direct column filter DOES work (organization_id)') + throw new Error('Expected 1+ event but received none') + } + + console.log('[RESULT]') + console.log('[RESULT] ✅ PASS: Realtime event received with direct column filter') + console.log('[RESULT] ✅ PASS: Trigger kept organization_id in sync') + console.log('[RESULT]') + + if (channel) await supabase.removeChannel(channel) +} + +main().catch((err) => { + if (isSchemaSetupError(err)) { + printSchemaSetupHelp() + process.exit(1) + } + + console.error('[ERROR]', err.message) + process.exit(1) +}) diff --git a/examples/realtime-jsonb-filter-mre/subscription-client.mjs b/examples/realtime-jsonb-filter-mre/subscription-client.mjs new file mode 100644 index 000000000..fb9a7c28a --- /dev/null +++ b/examples/realtime-jsonb-filter-mre/subscription-client.mjs @@ -0,0 +1,57 @@ +import 'dotenv/config' +import { createClient } from '@supabase/supabase-js' + +const supabaseUrl = process.env.SUPABASE_URL +const supabaseAnonKey = process.env.SUPABASE_ANON_KEY + +if (!supabaseUrl || !supabaseAnonKey) { + throw new Error('Missing SUPABASE_URL or SUPABASE_ANON_KEY') +} + +export const supabase = createClient(supabaseUrl, supabaseAnonKey) + +/** + * Subscribe to jobs by organization using Realtime filters. + * + * Note: Realtime does not support JSONB expression filters like `data->>organization_id=eq.value`. + * We use a dedicated scalar `organization_id` column instead, kept in sync via database trigger. + */ +export function subscribeToJobsByOrganization(organizationId, onPayload) { + const filter = `organization_id=eq.${organizationId}` + + const channel = supabase + .channel(`jobs-org-${organizationId}`) + .on( + 'postgres_changes', + { + event: '*', + schema: 'pgboss', + table: 'job', + filter + }, + (payload) => { + console.log('[subscription] event received:', JSON.stringify(payload, null, 2)) + onPayload?.(payload) + } + ) + + return channel +} + +export function waitForSubscribed(channel, label = 'channel') { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error(`${label} subscribe timeout`)), 15000) + + channel.subscribe((status) => { + console.log(`[${label}] status:`, status) + if (status === 'SUBSCRIBED') { + clearTimeout(timeout) + resolve() + } + if (status === 'CHANNEL_ERROR' || status === 'TIMED_OUT' || status === 'CLOSED') { + clearTimeout(timeout) + reject(new Error(`${label} failed with status ${status}`)) + } + }) + }) +}