Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions FEATURES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Realtime Features

Realtime exposes three features over WebSockets. All three share the same connection and channel model: a client opens a single WebSocket, then joins one or more named topics (channels). Each feature is opted into per channel via the `config` payload at join time.

---

## Broadcast

Send ephemeral messages between clients subscribed to the same topic with low latency.

### How it works

1. A client sends a `broadcast` event to a channel topic.
2. The server validates the payload size against the tenant's `max_payload_size_in_kb` limit.
3. On private channels, RLS write policies are checked before the message is forwarded.
4. The message is dispatched to all other subscribers on the topic. By default, the sender is excluded.

### Configuration

Set at channel join time under `config.broadcast`:

| Key | Type | Description |
|-----|------|-------------|
| `self` | boolean | When `true`, the sender also receives its own broadcast messages. Default `false`. |
| `ack` | boolean | When `true`, the server replies with `:ok` after dispatching. Default `false` (fire-and-forget). |

### Constraints

- Payload size is capped at the tenant's `max_payload_size_in_kb` (with a ±500 byte margin).
- Rate is capped at the tenant's configured events-per-second limit. Exceeding it disconnects the client.
- On private channels, if the write policy denies access, the message is silently dropped.

---

## Presence

Track and synchronize which clients are connected, along with arbitrary metadata.

### How it works

1. A client sends a `presence` event with `track` or `untrack` action and an optional payload map.
2. `track` creates or updates the client's presence entry in a distributed ETS-backed store (Phoenix.Presence).
3. All subscribers on the topic receive a `presence_diff` message with the join/leave delta.
4. New joiners receive a `presence_state` message with the full current state for that topic.
5. `untrack` removes the entry immediately.

Presence state is keyed per client (UUID by default, or a caller-provided string via `config.presence.key`). Duplicate `track` calls with the same payload are no-ops.

### Configuration

Set at channel join time under `config.presence`:

| Key | Type | Description |
|-----|------|-------------|
| `key` | string | Custom key to identify this client's presence entry. Defaults to a generated UUID. |

### Constraints

- Payload must be a JSON object (map).
- Server-side rate limit: `max_presence_events_per_second` per tenant.
- Client-side rate limit: 5 events per 30-second window by default.
- Payload size is subject to the same `max_payload_size_in_kb` limit as Broadcast.
- On private channels, RLS read and write policies are enforced.

---

## Postgres Changes

Listen to INSERT, UPDATE, and DELETE events on Postgres tables and receive them in real time.

### How it works

1. At channel join, the client declares one or more subscriptions in `config.postgres_changes`. Each subscription specifies an event type, a table, and optional filters.
2. The server creates a `realtime.subscription` row in the tenant database for each subscription, storing the filters alongside the subscriber identity.
3. A replication poller reads changes from a logical replication slot (WAL) on a configurable interval.
4. For each WAL record, the tenant database runs `realtime.apply_rls()`, which:
- Checks RLS policies for each subscribing role.
- Runs `realtime.is_visible_through_filters()` to match the row against each subscription's filters.
5. Matching subscription IDs are returned alongside the change and dispatched to the appropriate clients.

### Subscription parameters

| Key | Required | Description |
|-----|----------|-------------|
| `event` | Yes | `INSERT`, `UPDATE`, `DELETE`, or `*` for all three. |
| `schema` | Yes | Database schema name (e.g. `public`). |
| `table` | No | Table name. Omit to listen to all tables in the schema. |
| `filter` | No | Filter expression (see below). |

### Filters

Filters limit which change events are delivered to a subscriber. They are evaluated inside the tenant database using the subscriber's role, so column-level privileges and RLS policies are always respected.

#### Syntax

A single filter has the form:

```
column=operator.value
```

Multiple filters are separated by commas. All conditions must be true for a change to be delivered (AND semantics):

```
col1=operator.value,col2=operator.value
```

Values that contain a comma or whitespace must be wrapped in double quotes:

```
col="some value, with comma"
```

#### Comparison operators

These operators cast both sides to the column's declared type before comparing.

| Operator | SQL | Description |
|----------|-----|-------------|
| `eq` | `=` | Column equals value. |
| `neq` | `!=` | Column does not equal value. |
| `lt` | `<` | Column is less than value. |
| `lte` | `<=` | Column is less than or equal to value. |
| `gt` | `>` | Column is greater than value. |
| `gte` | `>=` | Column is greater than or equal to value. |

#### List operators

The value is a parenthesised, comma-separated list: `(val1,val2,val3)`.

| Operator | SQL | Description |
|----------|-----|-------------|
| `in` | `= ANY(array)` | Column value appears in the list. |
| `not_in` | `!= ALL(array)` | Column value does not appear in the list. |

Maximum 100 values per list. The list is cast to an array of the column's type.

#### Pattern matching operators

Standard SQL pattern syntax applies: `%` matches any sequence of characters, `_` matches any single character.

| Operator | SQL | Description |
|----------|-----|-------------|
| `like` | `LIKE` | Column matches pattern (case-sensitive). |
| `ilike` | `ILIKE` | Column matches pattern (case-insensitive). |
| `not_like` | `NOT LIKE` | Column does not match pattern (case-sensitive). |
| `not_ilike` | `NOT ILIKE` | Column does not match pattern (case-insensitive). |

#### Identity operators

These operators use SQL `IS` / `IS NOT`, which handle `NULL` correctly. The value must be one of the four SQL keywords: `null`, `true`, `false`, `unknown`.

| Operator | SQL | Accepted values |
|----------|-----|-----------------|
| `is` | `IS` | `null`, `true`, `false`, `unknown` |
| `not_is` | `IS NOT` | `null`, `true`, `false`, `unknown` |

Unlike `eq`, these operators do not cast the right-hand side to the column type — they use the SQL keyword directly. This is the only correct way to test for `NULL`: `col=eq.null` translates to `col = NULL` which always yields `NULL` in SQL and never matches; `col=is.null` translates to `col IS NULL` and works as expected.

#### AND composition

All filters on a subscription are combined with AND. There is no OR operator at the filter level; OR conditions require separate subscriptions.

### DELETE events

For DELETE events, filters are applied against the old row values (the record as it existed before deletion), since the row no longer exists in the table.

### Constraints

- The subscribing role must have `SELECT` privilege on any column used in a filter.
- Filter values must be coercible to the column's type. Invalid values are rejected at subscription creation.
- `in` and `not_in` accept a maximum of 100 values.
- `is` and `not_is` only accept `null`, `true`, `false`, or `unknown`.
- RLS policies on the table are enforced — subscribers only receive rows they are permitted to read.

---

## Connection model

All three features share the same connection layer:

- One WebSocket per client.
- Up to `max_channels_per_client` topics per connection (default 100).
- JWT required on connect; re-validated every 5 minutes. Expired tokens disconnect the channel immediately.
- Token can be refreshed mid-connection by sending an `access_token` message on the socket.
13 changes: 12 additions & 1 deletion lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
slot_name: extension["slot_name"] <> slot_name_suffix(),
tenant_id: tenant_id,
rate_counter_args: rate_counter_args,
subscribers_nodes_table: args["subscribers_nodes_table"]
subscribers_nodes_table: args["subscribers_nodes_table"],
subscribers_pids_table: args["subscribers_pids_table"]
}

{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
Expand Down Expand Up @@ -97,6 +98,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
conn: conn,
tenant_id: tenant_id,
subscribers_nodes_table: subscribers_nodes_table,
subscribers_pids_table: subscribers_pids_table,
rate_counter_args: rate_counter_args
} = state
) do
Expand Down Expand Up @@ -153,6 +155,15 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do

{:noreply, %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}}

{:error, %Postgrex.Error{postgres: %{code: :raise_exception, message: msg}}} ->
log_error("SubscriptionFatalError", msg)

for {pid, _id, _ref, _node} <- :ets.tab2list(subscribers_pids_table) do
send(pid, {:subscription_fatal_error, msg})
end

{:noreply, state}

{:error, reason} ->
log_error("PoolingReplicationError", reason)

Expand Down
97 changes: 78 additions & 19 deletions lib/extensions/postgres_cdc_rls/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,31 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
%{id: binary, claims: map, subscription_params: subscription_params}
]

@filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]
@filter_types [
"eq",
"neq",
"lt",
"lte",
"gt",
"gte",
"in",
"like",
"ilike",
"is"
]

@not_op_map %{
"eq" => "neq",
"neq" => "eq",
"lt" => "gte",
"lte" => "gt",
"gt" => "lte",
"gte" => "lt",
"in" => "not_in",
"like" => "not_like",
"ilike" => "not_ilike",
"is" => "not_is"
}

@spec create(conn(), String.t(), subscription_list, pid(), pid()) ::
{:ok, Postgrex.Result.t()}
Expand Down Expand Up @@ -200,7 +224,8 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
@doc """
Parses subscription filter parameters into something we can pass into our `create_subscription` query.

We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'
We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in', 'like', 'ilike', 'is'.
Negated variants are expressed with the `not.` prefix: `not.eq`, `not.lt`, `not.in`, `not.like`, `not.ilike`, `not.is`, etc.

Multiple filters can be combined with commas and are applied as AND conditions:
`"col1=eq.val,col2=gt.5"` means `col1 = val AND col2 > 5`.
Expand Down Expand Up @@ -245,8 +270,8 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do

An unsupported filter will respond with an error tuple:

iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"})
{:error, ~s(Error parsing `filter` params: ["like", "hey"])}
iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=fts.hey"})
{:error, ~s(Error parsing `filter` params: ["fts", "hey"])}

Catch `undefined` filters:

Expand Down Expand Up @@ -312,36 +337,41 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
defp parse_filters(filter) when is_binary(filter) do
case String.trim(filter) do
"" -> {:ok, []}
trimmed -> scan(trimmed, trimmed, 0, 0, 0, [])
trimmed -> scan(trimmed, trimmed, 0, 0, 0, false, [])
end
end

# Reached end of binary — parse the final segment
defp scan(<<>>, orig, start, len, _depth, acc) do
defp scan(<<>>, orig, start, len, _depth, _quoted, acc) do
case parse_segment(binary_part(orig, start, len)) do
{:ok, parsed} -> {:ok, Enum.reverse([parsed | acc])}
{:error, _} = e -> e
end
end

defp scan(<<"(", rest::binary>>, orig, start, len, depth, acc) do
scan(rest, orig, start, len + 1, depth + 1, acc)
# Toggle quoted mode on double-quote; parens and commas have no special meaning while quoted
defp scan(<<"\"", rest::binary>>, orig, start, len, depth, quoted, acc) do
scan(rest, orig, start, len + 1, depth, not quoted, acc)
end

defp scan(<<")", rest::binary>>, orig, start, len, depth, acc) do
scan(rest, orig, start, len + 1, max(0, depth - 1), acc)
defp scan(<<"(", rest::binary>>, orig, start, len, depth, false = quoted, acc) do
scan(rest, orig, start, len + 1, depth + 1, quoted, acc)
end

# Comma at depth 0 — segment boundary
defp scan(<<",", rest::binary>>, orig, start, len, 0, acc) do
defp scan(<<")", rest::binary>>, orig, start, len, depth, false = quoted, acc) do
scan(rest, orig, start, len + 1, max(0, depth - 1), quoted, acc)
end

# Comma at depth 0 and not inside quotes — segment boundary
defp scan(<<",", rest::binary>>, orig, start, len, 0, false, acc) do
case parse_segment(binary_part(orig, start, len)) do
{:ok, parsed} -> scan(rest, orig, start + len + 1, 0, 0, [parsed | acc])
{:ok, parsed} -> scan(rest, orig, start + len + 1, 0, 0, false, [parsed | acc])
{:error, _} = e -> e
end
end

defp scan(<<_::8, rest::binary>>, orig, start, len, depth, acc) do
scan(rest, orig, start, len + 1, depth, acc)
defp scan(<<_::8, rest::binary>>, orig, start, len, depth, quoted, acc) do
scan(rest, orig, start, len + 1, depth, quoted, acc)
end

defp parse_segment(segment) do
Expand All @@ -351,8 +381,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do

trimmed ->
with [col, rest] <- String.split(trimmed, "=", parts: 2),
[filter_type, value] when filter_type in @filter_types <-
String.split(rest, ".", parts: 2),
{:ok, filter_type, value} <- parse_filter_rest(rest),
{:ok, formatted_value} <- format_filter_value(filter_type, value) do
{:ok, {col, filter_type, formatted_value}}
else
Expand All @@ -362,13 +391,43 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
end
end

defp format_filter_value("in", value) do
defp parse_filter_rest("not." <> rest) do
case String.split(rest, ".", parts: 2) do
[raw_op, value] ->
case @not_op_map[raw_op] do
nil -> {:error, "not.#{raw_op} is not a supported operator"}
op -> {:ok, op, value}
end

_ ->
{:error, inspect("not." <> rest)}
end
end

defp parse_filter_rest(rest) do
case String.split(rest, ".", parts: 2) do
[filter_type, value] when filter_type in @filter_types -> {:ok, filter_type, value}
other -> {:error, inspect(other)}
end
end

defp format_filter_value(op, value) when op in ["in", "not_in"] do
size = byte_size(value)

if size >= 2 and binary_part(value, 0, 1) == "(" and binary_part(value, size - 1, 1) == ")" do
{:ok, "{" <> binary_part(value, 1, size - 2) <> "}"}
else
{:error, "`in` filter value must be wrapped by parentheses"}
{:error, "`#{op}` filter value must be wrapped by parentheses"}
end
end

defp format_filter_value(_filter, "\"" <> rest) do
size = byte_size(rest)

if size >= 1 and binary_part(rest, size - 1, 1) == "\"" do
{:ok, binary_part(rest, 0, size - 1)}
else
{:error, "unmatched double-quote in filter value"}
end
end

Expand Down
Loading
Loading