diff --git a/FEATURES.md b/FEATURES.md new file mode 100644 index 000000000..f6896e3b0 --- /dev/null +++ b/FEATURES.md @@ -0,0 +1,185 @@ +# Realtime Features + +Realtime exposes three features over WebSockets. All three share the same connection and channel model: a client opens a single WebSocket, then joins one or more named topics (channels). Each feature is opted into per channel via the `config` payload at join time. + +--- + +## Broadcast + +Send ephemeral messages between clients subscribed to the same topic with low latency. + +### How it works + +1. A client sends a `broadcast` event to a channel topic. +2. The server validates the payload size against the tenant's `max_payload_size_in_kb` limit. +3. On private channels, RLS write policies are checked before the message is forwarded. +4. The message is dispatched to all other subscribers on the topic. By default, the sender is excluded. + +### Configuration + +Set at channel join time under `config.broadcast`: + +| Key | Type | Description | +|-----|------|-------------| +| `self` | boolean | When `true`, the sender also receives its own broadcast messages. Default `false`. | +| `ack` | boolean | When `true`, the server replies with `:ok` after dispatching. Default `false` (fire-and-forget). | + +### Constraints + +- Payload size is capped at the tenant's `max_payload_size_in_kb` (with a ±500 byte margin). +- Rate is capped at the tenant's configured events-per-second limit. Exceeding it disconnects the client. +- On private channels, if the write policy denies access, the message is silently dropped. + +--- + +## Presence + +Track and synchronize which clients are connected, along with arbitrary metadata. + +### How it works + +1. A client sends a `presence` event with `track` or `untrack` action and an optional payload map. +2. `track` creates or updates the client's presence entry in a distributed ETS-backed store (Phoenix.Presence). +3. All subscribers on the topic receive a `presence_diff` message with the join/leave delta. +4. New joiners receive a `presence_state` message with the full current state for that topic. +5. `untrack` removes the entry immediately. + +Presence state is keyed per client (UUID by default, or a caller-provided string via `config.presence.key`). Duplicate `track` calls with the same payload are no-ops. + +### Configuration + +Set at channel join time under `config.presence`: + +| Key | Type | Description | +|-----|------|-------------| +| `key` | string | Custom key to identify this client's presence entry. Defaults to a generated UUID. | + +### Constraints + +- Payload must be a JSON object (map). +- Server-side rate limit: `max_presence_events_per_second` per tenant. +- Client-side rate limit: 5 events per 30-second window by default. +- Payload size is subject to the same `max_payload_size_in_kb` limit as Broadcast. +- On private channels, RLS read and write policies are enforced. + +--- + +## Postgres Changes + +Listen to INSERT, UPDATE, and DELETE events on Postgres tables and receive them in real time. + +### How it works + +1. At channel join, the client declares one or more subscriptions in `config.postgres_changes`. Each subscription specifies an event type, a table, and optional filters. +2. The server creates a `realtime.subscription` row in the tenant database for each subscription, storing the filters alongside the subscriber identity. +3. A replication poller reads changes from a logical replication slot (WAL) on a configurable interval. +4. For each WAL record, the tenant database runs `realtime.apply_rls()`, which: + - Checks RLS policies for each subscribing role. + - Runs `realtime.is_visible_through_filters()` to match the row against each subscription's filters. +5. Matching subscription IDs are returned alongside the change and dispatched to the appropriate clients. + +### Subscription parameters + +| Key | Required | Description | +|-----|----------|-------------| +| `event` | Yes | `INSERT`, `UPDATE`, `DELETE`, or `*` for all three. | +| `schema` | Yes | Database schema name (e.g. `public`). | +| `table` | No | Table name. Omit to listen to all tables in the schema. | +| `filter` | No | Filter expression (see below). | + +### Filters + +Filters limit which change events are delivered to a subscriber. They are evaluated inside the tenant database using the subscriber's role, so column-level privileges and RLS policies are always respected. + +#### Syntax + +A single filter has the form: + +``` +column=operator.value +``` + +Multiple filters are separated by commas. All conditions must be true for a change to be delivered (AND semantics): + +``` +col1=operator.value,col2=operator.value +``` + +Values that contain a comma or whitespace must be wrapped in double quotes: + +``` +col="some value, with comma" +``` + +#### Comparison operators + +These operators cast both sides to the column's declared type before comparing. + +| Operator | SQL | Description | +|----------|-----|-------------| +| `eq` | `=` | Column equals value. | +| `neq` | `!=` | Column does not equal value. | +| `lt` | `<` | Column is less than value. | +| `lte` | `<=` | Column is less than or equal to value. | +| `gt` | `>` | Column is greater than value. | +| `gte` | `>=` | Column is greater than or equal to value. | + +#### List operators + +The value is a parenthesised, comma-separated list: `(val1,val2,val3)`. + +| Operator | SQL | Description | +|----------|-----|-------------| +| `in` | `= ANY(array)` | Column value appears in the list. | +| `not_in` | `!= ALL(array)` | Column value does not appear in the list. | + +Maximum 100 values per list. The list is cast to an array of the column's type. + +#### Pattern matching operators + +Standard SQL pattern syntax applies: `%` matches any sequence of characters, `_` matches any single character. + +| Operator | SQL | Description | +|----------|-----|-------------| +| `like` | `LIKE` | Column matches pattern (case-sensitive). | +| `ilike` | `ILIKE` | Column matches pattern (case-insensitive). | +| `not_like` | `NOT LIKE` | Column does not match pattern (case-sensitive). | +| `not_ilike` | `NOT ILIKE` | Column does not match pattern (case-insensitive). | + +#### Identity operators + +These operators use SQL `IS` / `IS NOT`, which handle `NULL` correctly. The value must be one of the four SQL keywords: `null`, `true`, `false`, `unknown`. + +| Operator | SQL | Accepted values | +|----------|-----|-----------------| +| `is` | `IS` | `null`, `true`, `false`, `unknown` | +| `not_is` | `IS NOT` | `null`, `true`, `false`, `unknown` | + +Unlike `eq`, these operators do not cast the right-hand side to the column type — they use the SQL keyword directly. This is the only correct way to test for `NULL`: `col=eq.null` translates to `col = NULL` which always yields `NULL` in SQL and never matches; `col=is.null` translates to `col IS NULL` and works as expected. + +#### AND composition + +All filters on a subscription are combined with AND. There is no OR operator at the filter level; OR conditions require separate subscriptions. + +### DELETE events + +For DELETE events, filters are applied against the old row values (the record as it existed before deletion), since the row no longer exists in the table. + +### Constraints + +- The subscribing role must have `SELECT` privilege on any column used in a filter. +- Filter values must be coercible to the column's type. Invalid values are rejected at subscription creation. +- `in` and `not_in` accept a maximum of 100 values. +- `is` and `not_is` only accept `null`, `true`, `false`, or `unknown`. +- RLS policies on the table are enforced — subscribers only receive rows they are permitted to read. + +--- + +## Connection model + +All three features share the same connection layer: + +- One WebSocket per client. +- Up to `max_channels_per_client` topics per connection (default 100). +- JWT required on connect; re-validated every 5 minutes. Expired tokens disconnect the channel immediately. +- Token can be refreshed mid-connection by sending an `access_token` message on the socket. diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 7fdf191a2..132f4d72f 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -58,7 +58,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do slot_name: extension["slot_name"] <> slot_name_suffix(), tenant_id: tenant_id, rate_counter_args: rate_counter_args, - subscribers_nodes_table: args["subscribers_nodes_table"] + subscribers_nodes_table: args["subscribers_nodes_table"], + subscribers_pids_table: args["subscribers_pids_table"] } {:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{}) @@ -97,6 +98,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do conn: conn, tenant_id: tenant_id, subscribers_nodes_table: subscribers_nodes_table, + subscribers_pids_table: subscribers_pids_table, rate_counter_args: rate_counter_args } = state ) do @@ -153,6 +155,15 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do {:noreply, %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}} + {:error, %Postgrex.Error{postgres: %{code: :raise_exception, message: msg}}} -> + log_error("SubscriptionFatalError", msg) + + for {pid, _id, _ref, _node} <- :ets.tab2list(subscribers_pids_table) do + send(pid, {:subscription_fatal_error, msg}) + end + + {:noreply, state} + {:error, reason} -> log_error("PoolingReplicationError", reason) diff --git a/lib/extensions/postgres_cdc_rls/subscriptions.ex b/lib/extensions/postgres_cdc_rls/subscriptions.ex index 02f3aa563..bc7d9467e 100644 --- a/lib/extensions/postgres_cdc_rls/subscriptions.ex +++ b/lib/extensions/postgres_cdc_rls/subscriptions.ex @@ -14,7 +14,31 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do %{id: binary, claims: map, subscription_params: subscription_params} ] - @filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"] + @filter_types [ + "eq", + "neq", + "lt", + "lte", + "gt", + "gte", + "in", + "like", + "ilike", + "is" + ] + + @not_op_map %{ + "eq" => "neq", + "neq" => "eq", + "lt" => "gte", + "lte" => "gt", + "gt" => "lte", + "gte" => "lt", + "in" => "not_in", + "like" => "not_like", + "ilike" => "not_ilike", + "is" => "not_is" + } @spec create(conn(), String.t(), subscription_list, pid(), pid()) :: {:ok, Postgrex.Result.t()} @@ -200,7 +224,8 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do @doc """ Parses subscription filter parameters into something we can pass into our `create_subscription` query. - We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in' + We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in', 'like', 'ilike', 'is'. + Negated variants are expressed with the `not.` prefix: `not.eq`, `not.lt`, `not.in`, `not.like`, `not.ilike`, `not.is`, etc. Multiple filters can be combined with commas and are applied as AND conditions: `"col1=eq.val,col2=gt.5"` means `col1 = val AND col2 > 5`. @@ -245,8 +270,8 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do An unsupported filter will respond with an error tuple: - iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"}) - {:error, ~s(Error parsing `filter` params: ["like", "hey"])} + iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=fts.hey"}) + {:error, ~s(Error parsing `filter` params: ["fts", "hey"])} Catch `undefined` filters: @@ -312,36 +337,41 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do defp parse_filters(filter) when is_binary(filter) do case String.trim(filter) do "" -> {:ok, []} - trimmed -> scan(trimmed, trimmed, 0, 0, 0, []) + trimmed -> scan(trimmed, trimmed, 0, 0, 0, false, []) end end # Reached end of binary — parse the final segment - defp scan(<<>>, orig, start, len, _depth, acc) do + defp scan(<<>>, orig, start, len, _depth, _quoted, acc) do case parse_segment(binary_part(orig, start, len)) do {:ok, parsed} -> {:ok, Enum.reverse([parsed | acc])} {:error, _} = e -> e end end - defp scan(<<"(", rest::binary>>, orig, start, len, depth, acc) do - scan(rest, orig, start, len + 1, depth + 1, acc) + # Toggle quoted mode on double-quote; parens and commas have no special meaning while quoted + defp scan(<<"\"", rest::binary>>, orig, start, len, depth, quoted, acc) do + scan(rest, orig, start, len + 1, depth, not quoted, acc) end - defp scan(<<")", rest::binary>>, orig, start, len, depth, acc) do - scan(rest, orig, start, len + 1, max(0, depth - 1), acc) + defp scan(<<"(", rest::binary>>, orig, start, len, depth, false = quoted, acc) do + scan(rest, orig, start, len + 1, depth + 1, quoted, acc) end - # Comma at depth 0 — segment boundary - defp scan(<<",", rest::binary>>, orig, start, len, 0, acc) do + defp scan(<<")", rest::binary>>, orig, start, len, depth, false = quoted, acc) do + scan(rest, orig, start, len + 1, max(0, depth - 1), quoted, acc) + end + + # Comma at depth 0 and not inside quotes — segment boundary + defp scan(<<",", rest::binary>>, orig, start, len, 0, false, acc) do case parse_segment(binary_part(orig, start, len)) do - {:ok, parsed} -> scan(rest, orig, start + len + 1, 0, 0, [parsed | acc]) + {:ok, parsed} -> scan(rest, orig, start + len + 1, 0, 0, false, [parsed | acc]) {:error, _} = e -> e end end - defp scan(<<_::8, rest::binary>>, orig, start, len, depth, acc) do - scan(rest, orig, start, len + 1, depth, acc) + defp scan(<<_::8, rest::binary>>, orig, start, len, depth, quoted, acc) do + scan(rest, orig, start, len + 1, depth, quoted, acc) end defp parse_segment(segment) do @@ -351,8 +381,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do trimmed -> with [col, rest] <- String.split(trimmed, "=", parts: 2), - [filter_type, value] when filter_type in @filter_types <- - String.split(rest, ".", parts: 2), + {:ok, filter_type, value} <- parse_filter_rest(rest), {:ok, formatted_value} <- format_filter_value(filter_type, value) do {:ok, {col, filter_type, formatted_value}} else @@ -362,13 +391,43 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do end end - defp format_filter_value("in", value) do + defp parse_filter_rest("not." <> rest) do + case String.split(rest, ".", parts: 2) do + [raw_op, value] -> + case @not_op_map[raw_op] do + nil -> {:error, "not.#{raw_op} is not a supported operator"} + op -> {:ok, op, value} + end + + _ -> + {:error, inspect("not." <> rest)} + end + end + + defp parse_filter_rest(rest) do + case String.split(rest, ".", parts: 2) do + [filter_type, value] when filter_type in @filter_types -> {:ok, filter_type, value} + other -> {:error, inspect(other)} + end + end + + defp format_filter_value(op, value) when op in ["in", "not_in"] do size = byte_size(value) if size >= 2 and binary_part(value, 0, 1) == "(" and binary_part(value, size - 1, 1) == ")" do {:ok, "{" <> binary_part(value, 1, size - 2) <> "}"} else - {:error, "`in` filter value must be wrapped by parentheses"} + {:error, "`#{op}` filter value must be wrapped by parentheses"} + end + end + + defp format_filter_value(_filter, "\"" <> rest) do + size = byte_size(rest) + + if size >= 1 and binary_part(rest, size - 1, 1) == "\"" do + {:ok, binary_part(rest, 0, size - 1)} + else + {:error, "unmatched double-quote in filter value"} end end diff --git a/lib/realtime/tenants/migrations.ex b/lib/realtime/tenants/migrations.ex index e9da9ead9..1123c1bd3 100644 --- a/lib/realtime/tenants/migrations.ex +++ b/lib/realtime/tenants/migrations.ex @@ -85,7 +85,8 @@ defmodule Realtime.Tenants.Migrations do FilterActionPostgresChanges, FixByteaDoubleEncodingInCast, ListChangesWithSlotCount, - AddBinaryPayloadToMessages + AddBinaryPayloadToMessages, + AddLikeIlikeIsNotOpsToFilters } @migrations [ @@ -158,7 +159,8 @@ defmodule Realtime.Tenants.Migrations do {20_251_120_215_549, FilterActionPostgresChanges}, {20_260_218_120_000, FixByteaDoubleEncodingInCast}, {20_260_326_120_000, ListChangesWithSlotCount}, - {20_260_514_120_000, AddBinaryPayloadToMessages} + {20_260_514_120_000, AddBinaryPayloadToMessages}, + {20_260_528_120_000, AddLikeIlikeIsNotOpsToFilters} ] defstruct [:tenant_external_id, :settings, migrations_ran: 0] diff --git a/lib/realtime/tenants/repo/migrations/20260528120000_add_like_ilike_is_not_ops_to_filters.ex b/lib/realtime/tenants/repo/migrations/20260528120000_add_like_ilike_is_not_ops_to_filters.ex new file mode 100644 index 000000000..bc38f180a --- /dev/null +++ b/lib/realtime/tenants/repo/migrations/20260528120000_add_like_ilike_is_not_ops_to_filters.ex @@ -0,0 +1,143 @@ +defmodule Realtime.Tenants.Migrations.AddLikeIlikeIsNotOpsToFilters do + @moduledoc false + + use Ecto.Migration + + def change do + execute("alter type realtime.equality_op add value 'like';") + execute("alter type realtime.equality_op add value 'ilike';") + execute("alter type realtime.equality_op add value 'is';") + execute("alter type realtime.equality_op add value 'not_in';") + execute("alter type realtime.equality_op add value 'not_like';") + execute("alter type realtime.equality_op add value 'not_ilike';") + execute("alter type realtime.equality_op add value 'not_is';") + + execute(" + create or replace function realtime.check_equality_op( + op realtime.equality_op, + type_ regtype, + val_1 text, + val_2 text + ) + returns bool + immutable + language plpgsql + as $$ + declare + op_symbol text; + res boolean; + begin + -- IS / IS NOT require keyword RHS, not a typed literal + if op = 'is' or op = 'not_is' then + if val_2 not in ('null', 'true', 'false', 'unknown') then + raise exception 'invalid value for is/not_is filter: must be null, true, false, or unknown'; + end if; + execute format( + 'select %L::%s %s %s', + val_1, + type_::text, + case when op = 'is' then 'IS' else 'IS NOT' end, + upper(val_2) + ) into res; + return res; + end if; + + op_symbol = case + when op = 'eq' then '=' + when op = 'neq' then '!=' + when op = 'lt' then '<' + when op = 'lte' then '<=' + when op = 'gt' then '>' + when op = 'gte' then '>=' + when op = 'in' then '= any' + when op = 'not_in' then '!= all' + when op = 'like' then 'LIKE' + when op = 'ilike' then 'ILIKE' + when op = 'not_like' then 'NOT LIKE' + when op = 'not_ilike' then 'NOT ILIKE' + else null + end; + + if op_symbol is null then + raise exception 'unsupported equality operator: %', op::text; + end if; + + execute format( + 'select %L::'|| type_::text || ' ' || op_symbol + || ' ( %L::' + || ( + case + when op in ('in', 'not_in') then type_::text || '[]' + else type_::text + end + ) + || ')', val_1, val_2) into res; + return res; + end; + $$; + ") + + execute(" + create or replace function realtime.subscription_check_filters() + returns trigger + language plpgsql + as $$ + declare + col_names text[] = coalesce( + array_agg(c.column_name order by c.ordinal_position), + '{}'::text[] + ) + from + information_schema.columns c + where + format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity + and pg_catalog.has_column_privilege( + (new.claims ->> 'role'), + format('%I.%I', c.table_schema, c.table_name)::regclass, + c.column_name, + 'SELECT' + ); + filter realtime.user_defined_filter; + col_type regtype; + in_val jsonb; + begin + for filter in select * from unnest(new.filters) loop + if not filter.column_name = any(col_names) then + raise exception 'invalid column for filter %', filter.column_name; + end if; + + col_type = ( + select atttypid::regtype + from pg_catalog.pg_attribute + where attrelid = new.entity + and attname = filter.column_name + ); + if col_type is null then + raise exception 'failed to lookup type for column %', filter.column_name; + end if; + + if filter.op in ('in'::realtime.equality_op, 'not_in'::realtime.equality_op) then + in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype); + if coalesce(jsonb_array_length(in_val), 0) > 100 then + raise exception 'too many values for `in`/`not_in` filter. Maximum 100'; + end if; + elsif filter.op in ('is'::realtime.equality_op, 'not_is'::realtime.equality_op) then + if filter.value not in ('null', 'true', 'false', 'unknown') then + raise exception 'invalid value for is/not_is filter: must be null, true, false, or unknown'; + end if; + else + perform realtime.cast(filter.value, col_type); + end if; + end loop; + + new.filters = coalesce( + array_agg(f order by f.column_name, f.op, f.value), + '{}' + ) from unnest(new.filters) f; + + return new; + end; + $$; + ") + end +end diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index b1797ed4f..ba2ec32eb 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -390,6 +390,11 @@ defmodule RealtimeWeb.RealtimeChannel do end def handle_info(:sync_presence, socket), do: {:noreply, socket} + + def handle_info({:subscription_fatal_error, reason}, socket) do + shutdown_response(socket, reason) + end + def handle_info(_, socket), do: {:noreply, socket} @impl true diff --git a/priv/repo/tenant_db_baseline.json b/priv/repo/tenant_db_baseline.json index a4610c26a..0577ddc68 100644 --- a/priv/repo/tenant_db_baseline.json +++ b/priv/repo/tenant_db_baseline.json @@ -410,6 +410,34 @@ { "sort_order": 7, "label": "in" + }, + { + "sort_order": 8, + "label": "like" + }, + { + "sort_order": 9, + "label": "ilike" + }, + { + "sort_order": 10, + "label": "is" + }, + { + "sort_order": 11, + "label": "not_in" + }, + { + "sort_order": 12, + "label": "not_like" + }, + { + "sort_order": 13, + "label": "not_ilike" + }, + { + "sort_order": 14, + "label": "not_is" } ], "comment": null, @@ -1456,10 +1484,10 @@ "all_argument_types": [], "argument_modes": null, "argument_defaults": null, - "source_code": "\n /*\n Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness\n */\n declare\n op_symbol text = (\n case\n when op = 'eq' then '='\n when op = 'neq' then '!='\n when op = 'lt' then '<'\n when op = 'lte' then '<='\n when op = 'gt' then '>'\n when op = 'gte' then '>='\n when op = 'in' then '= any'\n else 'UNKNOWN OP'\n end\n );\n res boolean;\n begin\n execute format(\n 'select %L::'|| type_::text || ' ' || op_symbol\n || ' ( %L::'\n || (\n case\n when op = 'in' then type_::text || '[]'\n else type_::text end\n )\n || ')', val_1, val_2) into res;\n return res;\n end;\n ", + "source_code": "\n declare\n op_symbol text;\n res boolean;\n begin\n -- IS / IS NOT require keyword RHS, not a typed literal\n if op = 'is' or op = 'not_is' then\n if val_2 not in ('null', 'true', 'false', 'unknown') then\n raise exception 'invalid value for is/not_is filter: must be null, true, false, or unknown';\n end if;\n execute format(\n 'select %L::%s %s %s',\n val_1,\n type_::text,\n case when op = 'is' then 'IS' else 'IS NOT' end,\n upper(val_2)\n ) into res;\n return res;\n end if;\n\n op_symbol = case\n when op = 'eq' then '='\n when op = 'neq' then '!='\n when op = 'lt' then '<'\n when op = 'lte' then '<='\n when op = 'gt' then '>'\n when op = 'gte' then '>='\n when op = 'in' then '= any'\n when op = 'not_in' then '!= all'\n when op = 'like' then 'LIKE'\n when op = 'ilike' then 'ILIKE'\n when op = 'not_like' then 'NOT LIKE'\n when op = 'not_ilike' then 'NOT ILIKE'\n else null\n end;\n\n if op_symbol is null then\n raise exception 'unsupported equality operator: %', op::text;\n end if;\n\n execute format(\n 'select %L::'|| type_::text || ' ' || op_symbol\n || ' ( %L::'\n || (\n case\n when op in ('in', 'not_in') then type_::text || '[]'\n else type_::text\n end\n )\n || ')', val_1, val_2) into res;\n return res;\n end;\n ", "binary_path": null, "sql_body": null, - "definition": "CREATE OR REPLACE FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text)\n RETURNS boolean\n LANGUAGE plpgsql\n IMMUTABLE\nAS $function$\n /*\n Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness\n */\n declare\n op_symbol text = (\n case\n when op = 'eq' then '='\n when op = 'neq' then '!='\n when op = 'lt' then '<'\n when op = 'lte' then '<='\n when op = 'gt' then '>'\n when op = 'gte' then '>='\n when op = 'in' then '= any'\n else 'UNKNOWN OP'\n end\n );\n res boolean;\n begin\n execute format(\n 'select %L::'|| type_::text || ' ' || op_symbol\n || ' ( %L::'\n || (\n case\n when op = 'in' then type_::text || '[]'\n else type_::text end\n )\n || ')', val_1, val_2) into res;\n return res;\n end;\n $function$\n", + "definition": "CREATE OR REPLACE FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text)\n RETURNS boolean\n LANGUAGE plpgsql\n IMMUTABLE\nAS $function$\n declare\n op_symbol text;\n res boolean;\n begin\n -- IS / IS NOT require keyword RHS, not a typed literal\n if op = 'is' or op = 'not_is' then\n if val_2 not in ('null', 'true', 'false', 'unknown') then\n raise exception 'invalid value for is/not_is filter: must be null, true, false, or unknown';\n end if;\n execute format(\n 'select %L::%s %s %s',\n val_1,\n type_::text,\n case when op = 'is' then 'IS' else 'IS NOT' end,\n upper(val_2)\n ) into res;\n return res;\n end if;\n\n op_symbol = case\n when op = 'eq' then '='\n when op = 'neq' then '!='\n when op = 'lt' then '<'\n when op = 'lte' then '<='\n when op = 'gt' then '>'\n when op = 'gte' then '>='\n when op = 'in' then '= any'\n when op = 'not_in' then '!= all'\n when op = 'like' then 'LIKE'\n when op = 'ilike' then 'ILIKE'\n when op = 'not_like' then 'NOT LIKE'\n when op = 'not_ilike' then 'NOT ILIKE'\n else null\n end;\n\n if op_symbol is null then\n raise exception 'unsupported equality operator: %', op::text;\n end if;\n\n execute format(\n 'select %L::'|| type_::text || ' ' || op_symbol\n || ' ( %L::'\n || (\n case\n when op in ('in', 'not_in') then type_::text || '[]'\n else type_::text\n end\n )\n || ')', val_1, val_2) into res;\n return res;\n end;\n $function$\n", "config": null, "owner": "supabase_admin", "comment": null, @@ -1753,7 +1781,7 @@ ], "security_labels": [] }, - "procedure:realtime.send(jsonb,text,text,boolean)": { + "procedure:realtime.send(bytea,text,text,boolean)": { "schema": "realtime", "name": "send", "kind": "f", @@ -1777,7 +1805,7 @@ "private" ], "argument_types": [ - "jsonb", + "bytea", "text", "text", "boolean" @@ -1785,10 +1813,10 @@ "all_argument_types": [], "argument_modes": null, "argument_defaults": "true", - "source_code": "\nDECLARE\n generated_id uuid;\n final_payload jsonb;\nBEGIN\n BEGIN\n -- Generate a new UUID for the id\n generated_id := gen_random_uuid();\n\n -- Check if payload has an 'id' key, if not, add the generated UUID\n IF payload ? 'id' THEN\n final_payload := payload;\n ELSE\n final_payload := jsonb_set(payload, '{id}', to_jsonb(generated_id));\n END IF;\n\n -- Set the topic configuration\n EXECUTE format('SET LOCAL realtime.topic TO %L', topic);\n\n -- Attempt to insert the message\n INSERT INTO realtime.messages (id, payload, event, topic, private, extension)\n VALUES (generated_id, final_payload, event, topic, private, 'broadcast');\n EXCEPTION\n WHEN OTHERS THEN\n -- Capture and notify the error\n RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;\n END;\nEND;\n", + "source_code": "\nDECLARE\n generated_id uuid;\nBEGIN\n BEGIN\n generated_id := gen_random_uuid();\n\n EXECUTE format('SET LOCAL realtime.topic TO %L', topic);\n\n INSERT INTO realtime.messages (id, binary_payload, event, topic, private, extension)\n VALUES (generated_id, payload, event, topic, private, 'broadcast');\n EXCEPTION\n WHEN OTHERS THEN\n RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;\n END;\nEND;\n", "binary_path": null, "sql_body": null, - "definition": "CREATE OR REPLACE FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean DEFAULT true)\n RETURNS void\n LANGUAGE plpgsql\nAS $function$\nDECLARE\n generated_id uuid;\n final_payload jsonb;\nBEGIN\n BEGIN\n -- Generate a new UUID for the id\n generated_id := gen_random_uuid();\n\n -- Check if payload has an 'id' key, if not, add the generated UUID\n IF payload ? 'id' THEN\n final_payload := payload;\n ELSE\n final_payload := jsonb_set(payload, '{id}', to_jsonb(generated_id));\n END IF;\n\n -- Set the topic configuration\n EXECUTE format('SET LOCAL realtime.topic TO %L', topic);\n\n -- Attempt to insert the message\n INSERT INTO realtime.messages (id, payload, event, topic, private, extension)\n VALUES (generated_id, final_payload, event, topic, private, 'broadcast');\n EXCEPTION\n WHEN OTHERS THEN\n -- Capture and notify the error\n RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;\n END;\nEND;\n$function$\n", + "definition": "CREATE OR REPLACE FUNCTION realtime.send(payload bytea, event text, topic text, private boolean DEFAULT true)\n RETURNS void\n LANGUAGE plpgsql\nAS $function$\nDECLARE\n generated_id uuid;\nBEGIN\n BEGIN\n generated_id := gen_random_uuid();\n\n EXECUTE format('SET LOCAL realtime.topic TO %L', topic);\n\n INSERT INTO realtime.messages (id, binary_payload, event, topic, private, extension)\n VALUES (generated_id, payload, event, topic, private, 'broadcast');\n EXCEPTION\n WHEN OTHERS THEN\n RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;\n END;\nEND;\n$function$\n", "config": null, "owner": "supabase_admin", "comment": null, @@ -1816,7 +1844,7 @@ ], "security_labels": [] }, - "procedure:realtime.send(bytea,text,text,boolean)": { + "procedure:realtime.send(jsonb,text,text,boolean)": { "schema": "realtime", "name": "send", "kind": "f", @@ -1840,7 +1868,7 @@ "private" ], "argument_types": [ - "bytea", + "jsonb", "text", "text", "boolean" @@ -1848,10 +1876,10 @@ "all_argument_types": [], "argument_modes": null, "argument_defaults": "true", - "source_code": "\nDECLARE\n generated_id uuid;\nBEGIN\n BEGIN\n generated_id := gen_random_uuid();\n\n EXECUTE format('SET LOCAL realtime.topic TO %L', topic);\n\n INSERT INTO realtime.messages (id, binary_payload, event, topic, private, extension)\n VALUES (generated_id, payload, event, topic, private, 'broadcast');\n EXCEPTION\n WHEN OTHERS THEN\n RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;\n END;\nEND;\n", + "source_code": "\nDECLARE\n generated_id uuid;\n final_payload jsonb;\nBEGIN\n BEGIN\n -- Generate a new UUID for the id\n generated_id := gen_random_uuid();\n\n -- Check if payload has an 'id' key, if not, add the generated UUID\n IF payload ? 'id' THEN\n final_payload := payload;\n ELSE\n final_payload := jsonb_set(payload, '{id}', to_jsonb(generated_id));\n END IF;\n\n -- Set the topic configuration\n EXECUTE format('SET LOCAL realtime.topic TO %L', topic);\n\n -- Attempt to insert the message\n INSERT INTO realtime.messages (id, payload, event, topic, private, extension)\n VALUES (generated_id, final_payload, event, topic, private, 'broadcast');\n EXCEPTION\n WHEN OTHERS THEN\n -- Capture and notify the error\n RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;\n END;\nEND;\n", "binary_path": null, "sql_body": null, - "definition": "CREATE OR REPLACE FUNCTION realtime.send(payload bytea, event text, topic text, private boolean DEFAULT true)\n RETURNS void\n LANGUAGE plpgsql\nAS $function$\nDECLARE\n generated_id uuid;\nBEGIN\n BEGIN\n generated_id := gen_random_uuid();\n\n EXECUTE format('SET LOCAL realtime.topic TO %L', topic);\n\n INSERT INTO realtime.messages (id, binary_payload, event, topic, private, extension)\n VALUES (generated_id, payload, event, topic, private, 'broadcast');\n EXCEPTION\n WHEN OTHERS THEN\n RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;\n END;\nEND;\n$function$\n", + "definition": "CREATE OR REPLACE FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean DEFAULT true)\n RETURNS void\n LANGUAGE plpgsql\nAS $function$\nDECLARE\n generated_id uuid;\n final_payload jsonb;\nBEGIN\n BEGIN\n -- Generate a new UUID for the id\n generated_id := gen_random_uuid();\n\n -- Check if payload has an 'id' key, if not, add the generated UUID\n IF payload ? 'id' THEN\n final_payload := payload;\n ELSE\n final_payload := jsonb_set(payload, '{id}', to_jsonb(generated_id));\n END IF;\n\n -- Set the topic configuration\n EXECUTE format('SET LOCAL realtime.topic TO %L', topic);\n\n -- Attempt to insert the message\n INSERT INTO realtime.messages (id, payload, event, topic, private, extension)\n VALUES (generated_id, final_payload, event, topic, private, 'broadcast');\n EXCEPTION\n WHEN OTHERS THEN\n -- Capture and notify the error\n RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM;\n END;\nEND;\n$function$\n", "config": null, "owner": "supabase_admin", "comment": null, @@ -1901,10 +1929,10 @@ "all_argument_types": [], "argument_modes": null, "argument_defaults": null, - "source_code": "\n /*\n Validates that the user defined filters for a subscription:\n - refer to valid columns that the claimed role may access\n - values are coercable to the correct column type\n */\n declare\n col_names text[] = coalesce(\n array_agg(c.column_name order by c.ordinal_position),\n '{}'::text[]\n )\n from\n information_schema.columns c\n where\n format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity\n and pg_catalog.has_column_privilege(\n (new.claims ->> 'role'),\n format('%I.%I', c.table_schema, c.table_name)::regclass,\n c.column_name,\n 'SELECT'\n );\n filter realtime.user_defined_filter;\n col_type regtype;\n\n in_val jsonb;\n begin\n for filter in select * from unnest(new.filters) loop\n -- Filtered column is valid\n if not filter.column_name = any(col_names) then\n raise exception 'invalid column for filter %', filter.column_name;\n end if;\n\n -- Type is sanitized and safe for string interpolation\n col_type = (\n select atttypid::regtype\n from pg_catalog.pg_attribute\n where attrelid = new.entity\n and attname = filter.column_name\n );\n if col_type is null then\n raise exception 'failed to lookup type for column %', filter.column_name;\n end if;\n\n -- Set maximum number of entries for in filter\n if filter.op = 'in'::realtime.equality_op then\n in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype);\n if coalesce(jsonb_array_length(in_val), 0) > 100 then\n raise exception 'too many values for `in` filter. Maximum 100';\n end if;\n else\n -- raises an exception if value is not coercable to type\n perform realtime.cast(filter.value, col_type);\n end if;\n\n end loop;\n\n -- Apply consistent order to filters so the unique constraint on\n -- (subscription_id, entity, filters) can't be tricked by a different filter order\n new.filters = coalesce(\n array_agg(f order by f.column_name, f.op, f.value),\n '{}'\n ) from unnest(new.filters) f;\n\n return new;\n end;\n ", + "source_code": "\n declare\n col_names text[] = coalesce(\n array_agg(c.column_name order by c.ordinal_position),\n '{}'::text[]\n )\n from\n information_schema.columns c\n where\n format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity\n and pg_catalog.has_column_privilege(\n (new.claims ->> 'role'),\n format('%I.%I', c.table_schema, c.table_name)::regclass,\n c.column_name,\n 'SELECT'\n );\n filter realtime.user_defined_filter;\n col_type regtype;\n in_val jsonb;\n begin\n for filter in select * from unnest(new.filters) loop\n if not filter.column_name = any(col_names) then\n raise exception 'invalid column for filter %', filter.column_name;\n end if;\n\n col_type = (\n select atttypid::regtype\n from pg_catalog.pg_attribute\n where attrelid = new.entity\n and attname = filter.column_name\n );\n if col_type is null then\n raise exception 'failed to lookup type for column %', filter.column_name;\n end if;\n\n if filter.op in ('in'::realtime.equality_op, 'not_in'::realtime.equality_op) then\n in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype);\n if coalesce(jsonb_array_length(in_val), 0) > 100 then\n raise exception 'too many values for `in`/`not_in` filter. Maximum 100';\n end if;\n elsif filter.op in ('is'::realtime.equality_op, 'not_is'::realtime.equality_op) then\n if filter.value not in ('null', 'true', 'false', 'unknown') then\n raise exception 'invalid value for is/not_is filter: must be null, true, false, or unknown';\n end if;\n else\n perform realtime.cast(filter.value, col_type);\n end if;\n end loop;\n\n new.filters = coalesce(\n array_agg(f order by f.column_name, f.op, f.value),\n '{}'\n ) from unnest(new.filters) f;\n\n return new;\n end;\n ", "binary_path": null, "sql_body": null, - "definition": "CREATE OR REPLACE FUNCTION realtime.subscription_check_filters()\n RETURNS trigger\n LANGUAGE plpgsql\nAS $function$\n /*\n Validates that the user defined filters for a subscription:\n - refer to valid columns that the claimed role may access\n - values are coercable to the correct column type\n */\n declare\n col_names text[] = coalesce(\n array_agg(c.column_name order by c.ordinal_position),\n '{}'::text[]\n )\n from\n information_schema.columns c\n where\n format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity\n and pg_catalog.has_column_privilege(\n (new.claims ->> 'role'),\n format('%I.%I', c.table_schema, c.table_name)::regclass,\n c.column_name,\n 'SELECT'\n );\n filter realtime.user_defined_filter;\n col_type regtype;\n\n in_val jsonb;\n begin\n for filter in select * from unnest(new.filters) loop\n -- Filtered column is valid\n if not filter.column_name = any(col_names) then\n raise exception 'invalid column for filter %', filter.column_name;\n end if;\n\n -- Type is sanitized and safe for string interpolation\n col_type = (\n select atttypid::regtype\n from pg_catalog.pg_attribute\n where attrelid = new.entity\n and attname = filter.column_name\n );\n if col_type is null then\n raise exception 'failed to lookup type for column %', filter.column_name;\n end if;\n\n -- Set maximum number of entries for in filter\n if filter.op = 'in'::realtime.equality_op then\n in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype);\n if coalesce(jsonb_array_length(in_val), 0) > 100 then\n raise exception 'too many values for `in` filter. Maximum 100';\n end if;\n else\n -- raises an exception if value is not coercable to type\n perform realtime.cast(filter.value, col_type);\n end if;\n\n end loop;\n\n -- Apply consistent order to filters so the unique constraint on\n -- (subscription_id, entity, filters) can't be tricked by a different filter order\n new.filters = coalesce(\n array_agg(f order by f.column_name, f.op, f.value),\n '{}'\n ) from unnest(new.filters) f;\n\n return new;\n end;\n $function$\n", + "definition": "CREATE OR REPLACE FUNCTION realtime.subscription_check_filters()\n RETURNS trigger\n LANGUAGE plpgsql\nAS $function$\n declare\n col_names text[] = coalesce(\n array_agg(c.column_name order by c.ordinal_position),\n '{}'::text[]\n )\n from\n information_schema.columns c\n where\n format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity\n and pg_catalog.has_column_privilege(\n (new.claims ->> 'role'),\n format('%I.%I', c.table_schema, c.table_name)::regclass,\n c.column_name,\n 'SELECT'\n );\n filter realtime.user_defined_filter;\n col_type regtype;\n in_val jsonb;\n begin\n for filter in select * from unnest(new.filters) loop\n if not filter.column_name = any(col_names) then\n raise exception 'invalid column for filter %', filter.column_name;\n end if;\n\n col_type = (\n select atttypid::regtype\n from pg_catalog.pg_attribute\n where attrelid = new.entity\n and attname = filter.column_name\n );\n if col_type is null then\n raise exception 'failed to lookup type for column %', filter.column_name;\n end if;\n\n if filter.op in ('in'::realtime.equality_op, 'not_in'::realtime.equality_op) then\n in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype);\n if coalesce(jsonb_array_length(in_val), 0) > 100 then\n raise exception 'too many values for `in`/`not_in` filter. Maximum 100';\n end if;\n elsif filter.op in ('is'::realtime.equality_op, 'not_is'::realtime.equality_op) then\n if filter.value not in ('null', 'true', 'false', 'unknown') then\n raise exception 'invalid value for is/not_is filter: must be null, true, false, or unknown';\n end if;\n else\n perform realtime.cast(filter.value, col_type);\n end if;\n end loop;\n\n new.filters = coalesce(\n array_agg(f order by f.column_name, f.op, f.value),\n '{}'\n ) from unnest(new.filters) f;\n\n return new;\n end;\n $function$\n", "config": null, "owner": "supabase_admin", "comment": null, diff --git a/test/e2e/realtime-check.ts b/test/e2e/realtime-check.ts index 9c3bbfb52..032e2dfc3 100644 --- a/test/e2e/realtime-check.ts +++ b/test/e2e/realtime-check.ts @@ -26,7 +26,7 @@ const program = new Command() .option("--json", "Output results as JSON to stdout") .option("--otel ", "OTLP HTTP endpoint for tracing (e.g. http://localhost:4318)") .option("--otel-token ", "Bearer token for authenticated OTLP endpoints") - .option("--test ", "Comma-separated list of test categories to run: functional,load,connection,load-postgres-changes,load-presence,load-broadcast,load-broadcast-from-db,load-broadcast-replay,broadcast,broadcast-replay,presence,authorization,postgres-changes,broadcast-changes") + .option("--test ", "Comma-separated list of test categories to run: functional,load,connection,load-postgres-changes,load-presence,load-broadcast,load-broadcast-from-db,load-broadcast-replay,broadcast,broadcast-replay,presence,authorization,postgres-changes,postgres-changes-filters,broadcast-changes") .parse(); const opts = program.opts(); @@ -332,6 +332,7 @@ async function setup(): Promise<{ userId: string; testUser: { email: string; pas payload jsonb NOT NULL DEFAULT '{}' )`), ]); + await runSql("pg_changes nullable_value column", sql`ALTER TABLE public.pg_changes ADD COLUMN IF NOT EXISTS nullable_value text`); log(kleur.dim(`setup: tables done (${(performance.now() - stepStart).toFixed(0)}ms)`)); stepStart = performance.now(); @@ -1207,6 +1208,215 @@ async function runPostgresChangesTests(testUser: { email: string; password: stri }); } +async function runPostgresChangesFiltersTests(testUser: { email: string; password: string }) { + suite("postgres-changes-filters"); + + await sleep(RATE_LIMIT_PAUSE_MS); + await test("like: delivers row matching pattern", async () => { + const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); + try { + await signInUser(supabase, testUser.email, testUser.password); + const sentinel = crypto.randomUUID().replace(/-/g, ""); + const value = `like-${sentinel}`; + let result: any = null; + + const channel = supabase + .channel(randomTopic(), BROADCAST_CONFIG) + .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes", filter: `value=like.like-${sentinel}` }, (p) => (result = p)); + + const { subscribeMs } = await openPostgresChannel(channel); + await executeInsert(supabase, "pg_changes", value); + await waitFor(() => result, "like event"); + + assert.strictEqual(result.eventType, "INSERT"); + assert.strictEqual(result.new.value, value); + return [{ label: "subscribe", value: subscribeMs, unit: "ms" }]; + } finally { + await stopClient(supabase); + } + }); + + await sleep(RATE_LIMIT_PAUSE_MS); + await test("ilike: delivers row matching pattern case-insensitively", async () => { + const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); + try { + await signInUser(supabase, testUser.email, testUser.password); + const sentinel = crypto.randomUUID().replace(/-/g, ""); + const value = `ilike-${sentinel}`; + let result: any = null; + + const channel = supabase + .channel(randomTopic(), BROADCAST_CONFIG) + .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes", filter: `value=ilike.ILIKE-${sentinel.toUpperCase()}` }, (p) => (result = p)); + + const { subscribeMs } = await openPostgresChannel(channel); + await executeInsert(supabase, "pg_changes", value); + await waitFor(() => result, "ilike event"); + + assert.strictEqual(result.eventType, "INSERT"); + assert.strictEqual(result.new.value, value); + return [{ label: "subscribe", value: subscribeMs, unit: "ms" }]; + } finally { + await stopClient(supabase); + } + }); + + await sleep(RATE_LIMIT_PAUSE_MS); + await test("in: delivers row whose value is in the list", async () => { + const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); + try { + await signInUser(supabase, testUser.email, testUser.password); + const tag = crypto.randomUUID().replace(/-/g, ""); + const values = [`inA${tag}`, `inB${tag}`, `inC${tag}`]; + const chosen = values[1]; + let result: any = null; + + const channel = supabase + .channel(randomTopic(), BROADCAST_CONFIG) + .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes", filter: `value=in.(${values.join(",")})` }, (p) => { if (p.new.value === chosen) result = p; }); + + const { subscribeMs } = await openPostgresChannel(channel); + await executeInsert(supabase, "pg_changes", chosen); + await waitFor(() => result, "in event"); + + assert.strictEqual(result.eventType, "INSERT"); + assert.strictEqual(result.new.value, chosen); + return [{ label: "subscribe", value: subscribeMs, unit: "ms" }]; + } finally { + await stopClient(supabase); + } + }); + + await sleep(RATE_LIMIT_PAUSE_MS); + await test("not_in: delivers row whose value is outside the list", async () => { + const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); + try { + await signInUser(supabase, testUser.email, testUser.password); + const tag = crypto.randomUUID().replace(/-/g, ""); + const excluded = [`excl1${tag}`, `excl2${tag}`]; + const included = `incl${tag}`; + let result: any = null; + + const channel = supabase + .channel(randomTopic(), BROADCAST_CONFIG) + .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes", filter: `value=not.in.(${excluded.join(",")})` }, (p) => { if (p.new.value === included) result = p; }); + + const { subscribeMs } = await openPostgresChannel(channel); + await executeInsert(supabase, "pg_changes", included); + await waitFor(() => result, "not_in event"); + + assert.strictEqual(result.eventType, "INSERT"); + assert.strictEqual(result.new.value, included); + return [{ label: "subscribe", value: subscribeMs, unit: "ms" }]; + } finally { + await stopClient(supabase); + } + }); + + await sleep(RATE_LIMIT_PAUSE_MS); + await test("not_like: delivers row that does not match pattern", async () => { + const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); + try { + await signInUser(supabase, testUser.email, testUser.password); + const tag = crypto.randomUUID().replace(/-/g, ""); + const value = `notlike-included-${tag}`; + let result: any = null; + + const channel = supabase + .channel(randomTopic(), BROADCAST_CONFIG) + .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes", filter: `value=not.like.notlike-excluded-%` }, (p) => { if (p.new.value === value) result = p; }); + + const { subscribeMs } = await openPostgresChannel(channel); + await executeInsert(supabase, "pg_changes", value); + await waitFor(() => result, "not_like event"); + + assert.strictEqual(result.eventType, "INSERT"); + assert.strictEqual(result.new.value, value); + return [{ label: "subscribe", value: subscribeMs, unit: "ms" }]; + } finally { + await stopClient(supabase); + } + }); + + await sleep(RATE_LIMIT_PAUSE_MS); + await test("not_ilike: delivers row that does not match pattern case-insensitively", async () => { + const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); + try { + await signInUser(supabase, testUser.email, testUser.password); + const tag = crypto.randomUUID().replace(/-/g, ""); + const value = `notilike-included-${tag}`; + let result: any = null; + + const channel = supabase + .channel(randomTopic(), BROADCAST_CONFIG) + .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes", filter: `value=not.ilike.NOTILIKE-EXCLUDED-%` }, (p) => { if (p.new.value === value) result = p; }); + + const { subscribeMs } = await openPostgresChannel(channel); + await executeInsert(supabase, "pg_changes", value); + await waitFor(() => result, "not_ilike event"); + + assert.strictEqual(result.eventType, "INSERT"); + assert.strictEqual(result.new.value, value); + return [{ label: "subscribe", value: subscribeMs, unit: "ms" }]; + } finally { + await stopClient(supabase); + } + }); + + await sleep(RATE_LIMIT_PAUSE_MS); + await test("is.null: delivers row with null column value", async () => { + const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); + try { + await signInUser(supabase, testUser.email, testUser.password); + let result: any = null; + + const channel = supabase + .channel(randomTopic(), BROADCAST_CONFIG) + .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes", filter: "nullable_value=is.null" }, (p) => (result = p)); + + const { subscribeMs } = await openPostgresChannel(channel); + const { data, error } = await supabase.from("pg_changes").insert({ nullable_value: null }).select("id"); + if (error) throw new Error(`Insert failed: ${error.message}`); + const id = (data as { id: number }[])[0].id; + await waitFor(() => result, "is.null event"); + + assert.strictEqual(result.eventType, "INSERT"); + assert.strictEqual(result.new.id, id); + assert.strictEqual(result.new.nullable_value, null); + return [{ label: "subscribe", value: subscribeMs, unit: "ms" }]; + } finally { + await stopClient(supabase); + } + }); + + await sleep(RATE_LIMIT_PAUSE_MS); + await test("not_is.null: delivers row with non-null column value", async () => { + const supabase = createClient(PROJECT_URL, ANON_KEY, { realtime: REALTIME_OPTS }); + try { + await signInUser(supabase, testUser.email, testUser.password); + const sentinel = crypto.randomUUID(); + let result: any = null; + + const channel = supabase + .channel(randomTopic(), BROADCAST_CONFIG) + .on("postgres_changes", { event: "INSERT", schema: "public", table: "pg_changes", filter: "nullable_value=not.is.null" }, (p) => { if (p.new.nullable_value === sentinel) result = p; }); + + const { subscribeMs } = await openPostgresChannel(channel); + const { data, error } = await supabase.from("pg_changes").insert({ nullable_value: sentinel }).select("id"); + if (error) throw new Error(`Insert failed: ${error.message}`); + const id = (data as { id: number }[])[0].id; + await waitFor(() => result, "not_is.null event"); + + assert.strictEqual(result.eventType, "INSERT"); + assert.strictEqual(result.new.id, id); + assert.strictEqual(result.new.nullable_value, sentinel); + return [{ label: "subscribe", value: subscribeMs, unit: "ms" }]; + } finally { + await stopClient(supabase); + } + }); +} + async function runBroadcastReplayTests(testUser: { email: string; password: string }) { suite("broadcast replay"); @@ -1355,6 +1565,7 @@ const SUITES: Record "presence": (u) => runPresenceTests(u), "authorization": (u) => runAuthorizationTests(u), "postgres-changes": (u) => runPostgresChangesTests(u), + "postgres-changes-filters": (u) => runPostgresChangesFiltersTests(u), "broadcast-changes": (u) => runBroadcastChangesTests(u), }; @@ -1369,6 +1580,7 @@ const DB_REQUIRED_SUITES = new Set([ "presence", "authorization", "postgres-changes", + "postgres-changes-filters", "broadcast-changes", ]); diff --git a/test/integration/rt_channel/postgres_changes_test.exs b/test/integration/rt_channel/postgres_changes_test.exs index b109cd673..2518749bc 100644 --- a/test/integration/rt_channel/postgres_changes_test.exs +++ b/test/integration/rt_channel/postgres_changes_test.exs @@ -506,6 +506,266 @@ defmodule Realtime.Integration.RtChannel.PostgresChangesTest do end end + describe "new operators (like, ilike, is, not.*)" do + defp subscribe_and_wait(socket, topic, filter) do + config = %{ + postgres_changes: [%{event: "INSERT", schema: "public", table: "test", filter: filter}] + } + + WebsocketClient.join(socket, topic, %{config: config}) + + assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, + 200 + + assert_receive %Message{ + event: "system", + payload: %{"message" => "Subscribed to PostgreSQL", "status" => "ok"} + }, + 8000 + end + + test "like: delivers matching row", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=like.%match%") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + %{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values ('it-match-es') returning id", []) + + assert_receive %Message{ + event: "postgres_changes", + payload: %{"data" => %{"record" => %{"id" => ^id}, "type" => "INSERT"}} + }, + 500 + end + + test "like: ignores non-matching row", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=like.%match%") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + Postgrex.query!(conn, "insert into test (details) values ('no-such-thing') returning id", []) + + refute_receive %Message{event: "postgres_changes", payload: %{"data" => %{"type" => "INSERT"}}, topic: ^topic}, + 500 + end + + test "like: is case-sensitive and ignores row that only matches case-insensitively", %{ + tenant: tenant, + serializer: serializer + } do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=like.%MATCH%") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + Postgrex.query!(conn, "insert into test (details) values ('it-match-es') returning id", []) + + refute_receive %Message{event: "postgres_changes", payload: %{"data" => %{"type" => "INSERT"}}, topic: ^topic}, + 500 + end + + test "ilike: delivers row matching case-insensitively", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=ilike.%MATCH%") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + %{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values ('it-match-es') returning id", []) + + assert_receive %Message{ + event: "postgres_changes", + payload: %{"data" => %{"record" => %{"id" => ^id}, "type" => "INSERT"}} + }, + 500 + end + + test "ilike: ignores row that does not match even case-insensitively", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=ilike.%MATCH%") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + Postgrex.query!(conn, "insert into test (details) values ('no-such-thing') returning id", []) + + refute_receive %Message{event: "postgres_changes", payload: %{"data" => %{"type" => "INSERT"}}, topic: ^topic}, + 500 + end + + test "is.null: delivers row with null details", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=is.null") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + %{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values (null) returning id", []) + + assert_receive %Message{ + event: "postgres_changes", + payload: %{"data" => %{"record" => %{"id" => ^id}, "type" => "INSERT"}} + }, + 500 + end + + test "is.null: ignores row with non-null details", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=is.null") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + Postgrex.query!(conn, "insert into test (details) values ('present') returning id", []) + + refute_receive %Message{event: "postgres_changes", payload: %{"data" => %{"type" => "INSERT"}}, topic: ^topic}, + 500 + end + + test "not.eq (neq): delivers non-matching row", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=not.eq.excluded") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + %{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values ('included') returning id", []) + + assert_receive %Message{ + event: "postgres_changes", + payload: %{"data" => %{"record" => %{"id" => ^id}, "type" => "INSERT"}} + }, + 500 + end + + test "not.in: delivers row whose value is outside the list", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=not.in.(foo,bar)") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + %{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values ('baz') returning id", []) + + assert_receive %Message{ + event: "postgres_changes", + payload: %{"data" => %{"record" => %{"id" => ^id}, "type" => "INSERT"}} + }, + 500 + end + + test "not.in: ignores row whose value is inside the list", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=not.in.(foo,bar)") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + Postgrex.query!(conn, "insert into test (details) values ('foo') returning id", []) + + refute_receive %Message{event: "postgres_changes", payload: %{"data" => %{"type" => "INSERT"}}, topic: ^topic}, + 500 + end + + test "not.is.null: delivers row with non-null details", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=not.is.null") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + %{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values ('present') returning id", []) + + assert_receive %Message{ + event: "postgres_changes", + payload: %{"data" => %{"record" => %{"id" => ^id}, "type" => "INSERT"}} + }, + 500 + end + + test "not.is.null: ignores row with null details", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=not.is.null") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + Postgrex.query!(conn, "insert into test (details) values (null) returning id", []) + + refute_receive %Message{event: "postgres_changes", payload: %{"data" => %{"type" => "INSERT"}}, topic: ^topic}, + 500 + end + + test "not.like: delivers row that does not match the pattern", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=not.like.%excluded%") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + %{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values ('included') returning id", []) + + assert_receive %Message{ + event: "postgres_changes", + payload: %{"data" => %{"record" => %{"id" => ^id}, "type" => "INSERT"}} + }, + 500 + end + + test "not.like: ignores row that matches the pattern", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=not.like.%excluded%") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + Postgrex.query!(conn, "insert into test (details) values ('excluded-value') returning id", []) + + refute_receive %Message{event: "postgres_changes", payload: %{"data" => %{"type" => "INSERT"}}, topic: ^topic}, + 500 + end + + test "not.ilike: delivers row that does not match the pattern case-insensitively", %{ + tenant: tenant, + serializer: serializer + } do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=not.ilike.%EXCLUDED%") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + %{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values ('included') returning id", []) + + assert_receive %Message{ + event: "postgres_changes", + payload: %{"data" => %{"record" => %{"id" => ^id}, "type" => "INSERT"}} + }, + 500 + end + + test "not.ilike: ignores row that matches the pattern case-insensitively", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=not.ilike.%EXCLUDED%") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + Postgrex.query!(conn, "insert into test (details) values ('excluded-value') returning id", []) + + refute_receive %Message{event: "postgres_changes", payload: %{"data" => %{"type" => "INSERT"}}, topic: ^topic}, + 500 + end + + test "not.in: ignores each value that appears in the exclusion list", %{tenant: tenant, serializer: serializer} do + {socket, _} = get_connection(tenant, serializer) + topic = "realtime:any" + subscribe_and_wait(socket, topic, "details=not.in.(alpha,beta,gamma)") + + {:ok, _, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + + Enum.each(["alpha", "beta", "gamma"], fn val -> + Postgrex.query!(conn, "insert into test (details) values ('#{val}') returning id", []) + + refute_receive %Message{ + event: "postgres_changes", + payload: %{"data" => %{"type" => "INSERT"}}, + topic: ^topic + }, + 300 + end) + end + end + describe "error handling" do test "error subscribing", %{tenant: tenant, serializer: serializer} do {:ok, conn} = Database.connect(tenant, "realtime_test") diff --git a/test/realtime/extensions/cdc_rls/replication_poller_test.exs b/test/realtime/extensions/cdc_rls/replication_poller_test.exs index b5ae73535..2cbafb99b 100644 --- a/test/realtime/extensions/cdc_rls/replication_poller_test.exs +++ b/test/realtime/extensions/cdc_rls/replication_poller_test.exs @@ -157,6 +157,45 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do assert_receive {:telemetry, [:realtime, :replication, :poller, :query, :stop], _, %{tenant: ^tenant_id}}, 2000 end + test "notifies subscribers and does not retry on raise_exception from check_equality_op", %{args: args} do + tenant_id = args["id"] + + error = + {:error, + %Postgrex.Error{ + postgres: %{ + code: :raise_exception, + message: "unsupported equality operator: future_op" + } + }} + + expect(Replications, :list_changes, fn _, _, _, _, _ -> error end) + + # Insert the test process as a subscriber so it receives the fatal error notification + :ets.insert(args["subscribers_pids_table"], {self(), "some-sub-id", make_ref(), node()}) + + start_link_supervised!({Poller, args}) + + assert_receive { + :telemetry, + [:realtime, :replication, :poller, :query, :stop], + %{duration: _}, + %{tenant: ^tenant_id} + }, + 1000 + + assert_receive {:subscription_fatal_error, "unsupported equality operator: future_op"}, 500 + + # No retry — the poller must not schedule another poll + refute_receive { + :telemetry, + [:realtime, :replication, :poller, :query, :stop], + %{duration: _}, + %{tenant: ^tenant_id} + }, + 500 + end + test "handles no new changes", %{args: args, tenant: tenant} do tenant_id = args["id"] reject(&TenantBroadcaster.pubsub_direct_broadcast/6) diff --git a/test/realtime/extensions/cdc_rls/subscriptions_test.exs b/test/realtime/extensions/cdc_rls/subscriptions_test.exs index ef93787c6..4e26e5236 100644 --- a/test/realtime/extensions/cdc_rls/subscriptions_test.exs +++ b/test/realtime/extensions/cdc_rls/subscriptions_test.exs @@ -43,7 +43,7 @@ defmodule Realtime.Extensions.PostgresCdcRls.SubscriptionsTest do Subscriptions.parse_subscription_params(%{ "schema" => "public", "table" => "test", - "filter" => "id=gt.0,id=like.100" + "filter" => "id=gt.0,id=fts.100" }) assert msg =~ "Error parsing `filter` params" @@ -158,6 +158,28 @@ defmodule Realtime.Extensions.PostgresCdcRls.SubscriptionsTest do assert msg =~ "empty segments" end + test "comma inside a plain eq value is treated as an AND separator and fails" do + assert {:error, msg} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "details=eq.hello,world" + }) + + assert msg =~ "Error parsing `filter` params" + end + + test "postgrest-style quoted string with comma in eq value is treated as a literal" do + assert {:ok, {"*", "public", "test", filters}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => ~s(details=eq."hello,world") + }) + + assert [{"details", "eq", "hello,world"}] = filters + end + # Fix 3: whitespace-only filter is treated as no filter, same as "" test "whitespace-only filter produces an empty filter list" do assert {:ok, {"*", "public", "test", []}} = @@ -167,6 +189,163 @@ defmodule Realtime.Extensions.PostgresCdcRls.SubscriptionsTest do "filter" => " " }) end + + test "in filter with quoted string values containing commas is preserved" do + assert {:ok, {"*", "public", "test", filters}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => ~s[name=in.("hi,there","yes,you")] + }) + + assert [{"name", "in", ~s[{"hi,there","yes,you"}]}] = filters + end + end + + describe "parse_subscription_params/1 with new operators and not. prefix" do + test "like operator parses correctly" do + assert {:ok, {"*", "public", "test", [{"name", "like", "%hello%"}]}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "name=like.%hello%" + }) + end + + test "ilike operator parses correctly" do + assert {:ok, {"*", "public", "test", [{"name", "ilike", "%hello%"}]}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "name=ilike.%hello%" + }) + end + + test "is operator parses correctly" do + assert {:ok, {"*", "public", "test", [{"deleted_at", "is", "null"}]}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "deleted_at=is.null" + }) + end + + test "not.eq maps to neq" do + assert {:ok, {"*", "public", "test", [{"id", "neq", "5"}]}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "id=not.eq.5" + }) + end + + test "not.lt maps to gte" do + assert {:ok, {"*", "public", "test", [{"id", "gte", "5"}]}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "id=not.lt.5" + }) + end + + test "not.in maps to not_in and formats value as array" do + assert {:ok, {"*", "public", "test", [{"id", "not_in", "{1,2,3}"}]}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "id=not.in.(1,2,3)" + }) + end + + test "not.like maps to not_like" do + assert {:ok, {"*", "public", "test", [{"name", "not_like", "%hello%"}]}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "name=not.like.%hello%" + }) + end + + test "not.ilike maps to not_ilike" do + assert {:ok, {"*", "public", "test", [{"name", "not_ilike", "%hello%"}]}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "name=not.ilike.%hello%" + }) + end + + test "not.is maps to not_is" do + assert {:ok, {"*", "public", "test", [{"deleted_at", "not_is", "null"}]}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "deleted_at=not.is.null" + }) + end + + test "not. with unsupported operator returns error" do + assert {:error, msg} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "id=not.fts.hello" + }) + + assert msg =~ "Error parsing `filter` params" + end + + test "not. with unsupported operator does not double-prefix the error message" do + assert {:error, msg} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "id=not.fts.hello" + }) + + refute msg =~ "Error parsing `filter` params: Error parsing" + end + + test "not. with missing value (no dot-separated value) does not double-prefix the error message" do + assert {:error, msg} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "id=not.eq" + }) + + assert msg =~ "Error parsing `filter` params" + refute msg =~ "Error parsing `filter` params: Error parsing" + end + + test "not_in used directly without not. prefix is rejected" do + assert {:error, _msg} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "id=not_in.(1,2,3)" + }) + end + + test "not_like used directly without not. prefix is rejected" do + assert {:error, _msg} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "name=not_like.%foo%" + }) + end + + test "not. operators combine correctly in AND filters" do + assert {:ok, {"*", "public", "test", filters}} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "status=not.eq.banned,deleted_at=is.null" + }) + + assert [{"status", "neq", "banned"}, {"deleted_at", "is", "null"}] = filters + end end describe "create/5" do @@ -308,6 +487,126 @@ defmodule Realtime.Extensions.PostgresCdcRls.SubscriptionsTest do Subscriptions.parse_subscription_params(%{"schema" => "public", "table" => "images", "filter" => [123]}) end + test "like filter on text column is accepted", %{conn: conn} do + {:ok, subscription_params} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "details=like.%hello%" + }) + + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + + assert {:ok, [%Postgrex.Result{}]} = + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) + end + + test "like filter on integer column is rejected due to type mismatch", %{conn: conn} do + {:ok, subscription_params} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "id=like.%hello%" + }) + + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + + assert {:error, {:subscription_insert_failed, msg}} = + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) + + assert msg =~ "invalid input syntax for type integer" + end + + test "ilike filter on text column is accepted", %{conn: conn} do + {:ok, subscription_params} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "details=ilike.%hello%" + }) + + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + + assert {:ok, [%Postgrex.Result{}]} = + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) + end + + test "is.null filter is accepted", %{conn: conn} do + {:ok, subscription_params} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "details=is.null" + }) + + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + + assert {:ok, [%Postgrex.Result{}]} = + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) + end + + test "is filter with invalid value is rejected", %{conn: conn} do + {:ok, subscription_params} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "details=is.other" + }) + + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + + assert {:error, {:subscription_insert_failed, msg}} = + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) + + assert msg =~ "invalid value for is/not_is filter" + end + + test "not.in filter on text column is accepted", %{conn: conn} do + {:ok, subscription_params} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "details=not.in.(foo,bar)" + }) + + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + + assert {:ok, [%Postgrex.Result{}]} = + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) + end + + test "not.in filter exceeding 100 values is rejected", %{conn: conn} do + values = Enum.map_join(1..101, ",", &"val#{&1}") + + {:ok, subscription_params} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "details=not.in.(#{values})" + }) + + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + + assert {:error, {:subscription_insert_failed, msg}} = + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) + + assert msg =~ "too many values" + end + + test "not.is.null filter is accepted", %{conn: conn} do + {:ok, subscription_params} = + Subscriptions.parse_subscription_params(%{ + "schema" => "public", + "table" => "test", + "filter" => "details=not.is.null" + }) + + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + + assert {:ok, [%Postgrex.Result{}]} = + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) + end + test "create with two comma-separated filters stores a two-element filter array", %{conn: conn} do {:ok, subscription_params} = Subscriptions.parse_subscription_params(%{ diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 667f60078..9615a3804 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -1534,6 +1534,29 @@ defmodule RealtimeWeb.RealtimeChannelTest do end end + describe "subscription fatal errors" do + test "subscription_fatal_error pushes system error to client and stops the channel", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt)) + assert %Socket{channel_pid: channel_pid} = subscribe_and_join!(socket, "realtime:test", %{}) + + send(channel_pid, {:subscription_fatal_error, "unsupported equality operator: bad_op"}) + + assert_receive %Socket.Message{ + topic: "realtime:test", + event: "system", + payload: %{ + message: "unsupported equality operator: bad_op", + status: "error", + extension: "system", + channel: "test" + } + } + + assert_process_down(channel_pid) + end + end + defp assert_process_down(pid) do ref = Process.monitor(pid) assert_receive {:DOWN, ^ref, :process, ^pid, _reason}