Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 96 additions & 6 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,52 @@
defmodule Extensions.PostgresCdcRls.ReplicationPoller do
@moduledoc """
Polls the write ahead log, applies row level sucurity policies for each subscriber
and broadcast records to the `MessageDispatcher`.
Polls the write-ahead log via a temporary logical replication slot, applies row
level security policies for each subscriber, and broadcasts records to the
`MessageDispatcher`.

## Lifecycle

On start the poller connects to the tenant's database, calls
`Replications.prepare_replication/2` to create the temporary slot, and fetches
the publication's tables via `Subscriptions.fetch_publication_tables/2`. If the
publication has tables, it kicks off the poll loop; if not, it stays idle and
waits for tables to appear.

## Poll loop

Each `:poll` calls `Replications.list_changes/5`, which drains the slot and
fans changes out to subscriber nodes. Reschedule cadence depends on activity:

* rows processed → poll again immediately,
* raw slot changes present but nothing for subscribers → poll after
`poll_interval_ms` (+ jitter),
* fully idle → back off to `poll_interval_ms * @idle_multiplier`.

When the publication is empty, `:poll` is a no-op — there are no tables to
decode, so the slot is not advanced.

## Reacting to publication changes

Every `@check_oids_interval` ms the poller re-fetches the publication's oids:

* tables appear (empty → non-empty): re-run `prepare_replication/1` to
recreate the slot if it was dropped, then resume polling.
* tables vanish (non-empty → empty): cancel pending polls and drop the
replication slot via `Replications.drop_replication_slot/2`. If the drop
fails for any reason other than `:slot_not_found`, the poller stops with
`{:shutdown, :drop_replication_slot_failed}`; because the slot is
temporary, Postgres releases it automatically when the DB connection
ends.

This mirrors `SubscriptionManager`'s own `:check_oids` loop, which manages
subscriptions when the publication's tables change.
"""

use GenServer
use Realtime.Logs

@idle_multiplier 5
@check_oids_interval 60_000

# Column order returned by realtime.list_changes/4 (see Replications.list_changes/5
# and the SQL function in
Expand All @@ -22,6 +61,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do

alias Extensions.PostgresCdcRls.MessageDispatcher
alias Extensions.PostgresCdcRls.Replications
alias Extensions.PostgresCdcRls.Subscriptions

alias Realtime.Adapters.Changes.DeletedRecord
alias Realtime.Adapters.Changes.NewRecord
Expand Down Expand Up @@ -58,7 +98,9 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
slot_name: extension["slot_name"] <> slot_name_suffix(),
tenant_id: tenant_id,
rate_counter_args: rate_counter_args,
subscribers_nodes_table: args["subscribers_nodes_table"]
subscribers_nodes_table: args["subscribers_nodes_table"],
oids: %{},
check_oid_ref: nil
}

{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
Expand All @@ -82,6 +124,11 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
end

@impl true
def handle_info(:poll, %{oids: oids, poll_ref: poll_ref} = state) when map_size(oids) == 0 do
cancel_timer(poll_ref)
{:noreply, %{state | poll_ref: nil}}
end

def handle_info(
:poll,
%{
Expand Down Expand Up @@ -169,6 +216,35 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
{:noreply, prepare_replication(state)}
end

def handle_info(:check_oids, %{conn: conn, publication: publication, oids: old_oids} = state) do
new_oids = Subscriptions.fetch_publication_tables(conn, publication)

case {map_size(old_oids), map_size(new_oids)} do
{0, n} when n > 0 ->
# prepare_replication/1 cancels check_oid_ref and reschedules it on success.
{:noreply, prepare_replication(%{state | oids: new_oids})}

{n, 0} when n > 0 ->
cancel_timer(state.poll_ref)

case Replications.drop_replication_slot(conn, state.slot_name) do
{:error, reason} when reason != :slot_not_found ->
# The slot is a temporary logical replication slot tied to this connection,
# so stopping the process releases it without leaking WAL.
log_error("DropReplicationSlotFailed", reason)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing entry in ERROR_CODES.md :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:tableflip:

{:stop, {:shutdown, :drop_replication_slot_failed}, state}

_ ->
cancel_timer(state.check_oid_ref)
{:noreply, %{state | oids: new_oids, poll_ref: nil, check_oid_ref: schedule_check_oids()}}
end
Comment on lines +227 to +240

_ ->
cancel_timer(state.check_oid_ref)
{:noreply, %{state | oids: new_oids, check_oid_ref: schedule_check_oids()}}
end
end

def slot_name_suffix do
case Application.get_env(:realtime, :slot_name_suffix) do
nil -> ""
Expand All @@ -180,11 +256,23 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do

defp convert_errors(_), do: nil

defp prepare_replication(%{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state) do
defp prepare_replication(
%{
backoff: backoff,
conn: conn,
slot_name: slot_name,
Comment on lines +259 to +263
publication: publication,
retry_count: retry_count,
check_oid_ref: check_oid_ref
} = state
) do
case Replications.prepare_replication(conn, slot_name) do
{:ok, _} ->
send(self(), :poll)
state
oids = Subscriptions.fetch_publication_tables(conn, publication)
if map_size(oids) > 0, do: send(self(), :poll)

cancel_timer(check_oid_ref)
%{state | oids: oids, check_oid_ref: schedule_check_oids()}

{:error, error} ->
log_error("PoolingReplicationPreparationError", error)
Expand All @@ -195,6 +283,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
end
end

defp schedule_check_oids, do: Process.send_after(self(), :check_oids, @check_oids_interval)

defp record_list_changes_telemetry(time, tenant_id) do
Realtime.Telemetry.execute(
[:realtime, :replication, :poller, :query, :stop],
Expand Down
14 changes: 14 additions & 0 deletions lib/extensions/postgres_cdc_rls/replications.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ defmodule Extensions.PostgresCdcRls.Replications do
end
end

@spec drop_replication_slot(pid(), String.t()) ::
{:ok, :dropped} | {:error, :slot_not_found | Postgrex.Error.t()}
def drop_replication_slot(conn, slot_name) do
case query(
conn,
"select pg_drop_replication_slot(slot_name) from pg_replication_slots where slot_name = $1",
[slot_name]
) do
{:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :slot_not_found}
{:ok, _} -> {:ok, :dropped}
{:error, error} -> {:error, error}
end
end

@spec get_pg_stat_activity_diff(pid(), integer()) ::
{:ok, integer()} | {:error, Postgrex.Error.t()}
def get_pg_stat_activity_diff(conn, db_pid) do
Expand Down
71 changes: 71 additions & 0 deletions test/realtime/extensions/cdc_rls/cdc_rls_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
setup :set_mimic_global

alias Extensions.PostgresCdcRls
alias Extensions.PostgresCdcRls.ReplicationPoller
alias Extensions.PostgresCdcRls.Subscriptions
alias PostgresCdcRls.SubscriptionManager
alias Postgrex
Expand Down Expand Up @@ -106,6 +107,76 @@ defmodule Realtime.Extensions.CdcRlsTest do
assert !Map.equal?(oids2, oids3)
end

test "Replication poller toggles slot when publication tables come and go", %{tenant: tenant} do
# setup/0 already received the "ready" event, which fires only after the poller's init/1
# (and its Registry.register) has run. :sys.get_state below then blocks until the poller
# finishes handle_continue and has its slot prepared.
[{poller_pid, _}] = Registry.lookup(ReplicationPoller.Registry, tenant.external_id)

# Use the SubscriptionManager pub connection to drive publication state from the test —
# the poller's own conn is owned by the poller process.
{:ok, _manager_pid, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id)

%{oids: initial_oids, slot_name: slot_name} = :sys.get_state(poller_pid)
refute initial_oids == %{}

assert %Postgrex.Result{rows: [[1]]} =
Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name])

# Drop the publication: poller should drop its slot and clear oids.
Postgrex.query!(conn, "drop publication if exists supabase_realtime_test", [])
send(poller_pid, :check_oids)
%{oids: oids_after_drop, poll_ref: poll_ref_after_drop} = :sys.get_state(poller_pid)
assert oids_after_drop == %{}
assert poll_ref_after_drop == nil

assert %Postgrex.Result{rows: [[0]]} =
Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name])

# Re-create the publication: poller should recreate the slot and repopulate oids.
Postgrex.query!(conn, "create publication supabase_realtime_test for all tables", [])
send(poller_pid, :check_oids)
%{oids: oids_after_create} = :sys.get_state(poller_pid)
refute oids_after_create == %{}

assert %Postgrex.Result{rows: [[1]]} =
Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name])
end

test "Replication poller toggles slot when tables are removed from the publication", %{tenant: tenant} do
[{poller_pid, _}] = Registry.lookup(ReplicationPoller.Registry, tenant.external_id)
{:ok, _manager_pid, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id)

%{oids: initial_oids, slot_name: slot_name} = :sys.get_state(poller_pid)
refute initial_oids == %{}

assert %Postgrex.Result{rows: [[1]]} =
Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name])

# Publication still exists but has no tables (recreated without FOR ALL TABLES
# since you can't ALTER ... DROP TABLE on a FOR ALL TABLES publication).
Postgrex.query!(conn, "drop publication if exists supabase_realtime_test", [])
Postgrex.query!(conn, "create publication supabase_realtime_test", [])

send(poller_pid, :check_oids)
%{oids: oids_after_empty, poll_ref: poll_ref_after_empty} = :sys.get_state(poller_pid)
assert oids_after_empty == %{}
assert poll_ref_after_empty == nil

assert %Postgrex.Result{rows: [[0]]} =
Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name])

# Add a table back to the publication: poller should recreate the slot and repopulate oids.
Postgrex.query!(conn, "alter publication supabase_realtime_test add table public.test", [])

send(poller_pid, :check_oids)
%{oids: oids_after_add} = :sys.get_state(poller_pid)
refute oids_after_add == %{}

assert %Postgrex.Result{rows: [[1]]} =
Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name])
end

test "Stop tenant supervisor", %{tenant: tenant} do
sup =
Enum.reduce_while(1..10, nil, fn _, acc ->
Expand Down
Loading
Loading