diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 7fdf191a2..a58f5b309 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -1,13 +1,52 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do @moduledoc """ - Polls the write ahead log, applies row level sucurity policies for each subscriber - and broadcast records to the `MessageDispatcher`. + Polls the write-ahead log via a temporary logical replication slot, applies row + level security policies for each subscriber, and broadcasts records to the + `MessageDispatcher`. + + ## Lifecycle + + On start the poller connects to the tenant's database, calls + `Replications.prepare_replication/2` to create the temporary slot, and fetches + the publication's tables via `Subscriptions.fetch_publication_tables/2`. If the + publication has tables, it kicks off the poll loop; if not, it stays idle and + waits for tables to appear. + + ## Poll loop + + Each `:poll` calls `Replications.list_changes/5`, which drains the slot and + fans changes out to subscriber nodes. Reschedule cadence depends on activity: + + * rows processed → poll again immediately, + * raw slot changes present but nothing for subscribers → poll after + `poll_interval_ms` (+ jitter), + * fully idle → back off to `poll_interval_ms * @idle_multiplier`. + + When the publication is empty, `:poll` is a no-op — there are no tables to + decode, so the slot is not advanced. + + ## Reacting to publication changes + + Every `@check_oids_interval` ms the poller re-fetches the publication's oids: + + * tables appear (empty → non-empty): re-run `prepare_replication/1` to + recreate the slot if it was dropped, then resume polling. + * tables vanish (non-empty → empty): cancel pending polls and drop the + replication slot via `Replications.drop_replication_slot/2`. If the drop + fails for any reason other than `:slot_not_found`, the poller stops with + `{:shutdown, :drop_replication_slot_failed}`; because the slot is + temporary, Postgres releases it automatically when the DB connection + ends. + + This mirrors `SubscriptionManager`'s own `:check_oids` loop, which manages + subscriptions when the publication's tables change. """ use GenServer use Realtime.Logs @idle_multiplier 5 + @check_oids_interval 60_000 # Column order returned by realtime.list_changes/4 (see Replications.list_changes/5 # and the SQL function in @@ -22,6 +61,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do alias Extensions.PostgresCdcRls.MessageDispatcher alias Extensions.PostgresCdcRls.Replications + alias Extensions.PostgresCdcRls.Subscriptions alias Realtime.Adapters.Changes.DeletedRecord alias Realtime.Adapters.Changes.NewRecord @@ -58,7 +98,9 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do slot_name: extension["slot_name"] <> slot_name_suffix(), tenant_id: tenant_id, rate_counter_args: rate_counter_args, - subscribers_nodes_table: args["subscribers_nodes_table"] + subscribers_nodes_table: args["subscribers_nodes_table"], + oids: %{}, + check_oid_ref: nil } {:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{}) @@ -82,6 +124,11 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do end @impl true + def handle_info(:poll, %{oids: oids, poll_ref: poll_ref} = state) when map_size(oids) == 0 do + cancel_timer(poll_ref) + {:noreply, %{state | poll_ref: nil}} + end + def handle_info( :poll, %{ @@ -169,6 +216,35 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do {:noreply, prepare_replication(state)} end + def handle_info(:check_oids, %{conn: conn, publication: publication, oids: old_oids} = state) do + new_oids = Subscriptions.fetch_publication_tables(conn, publication) + + case {map_size(old_oids), map_size(new_oids)} do + {0, n} when n > 0 -> + # prepare_replication/1 cancels check_oid_ref and reschedules it on success. + {:noreply, prepare_replication(%{state | oids: new_oids})} + + {n, 0} when n > 0 -> + cancel_timer(state.poll_ref) + + case Replications.drop_replication_slot(conn, state.slot_name) do + {:error, reason} when reason != :slot_not_found -> + # The slot is a temporary logical replication slot tied to this connection, + # so stopping the process releases it without leaking WAL. + log_error("DropReplicationSlotFailed", reason) + {:stop, {:shutdown, :drop_replication_slot_failed}, state} + + _ -> + cancel_timer(state.check_oid_ref) + {:noreply, %{state | oids: new_oids, poll_ref: nil, check_oid_ref: schedule_check_oids()}} + end + + _ -> + cancel_timer(state.check_oid_ref) + {:noreply, %{state | oids: new_oids, check_oid_ref: schedule_check_oids()}} + end + end + def slot_name_suffix do case Application.get_env(:realtime, :slot_name_suffix) do nil -> "" @@ -180,11 +256,23 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do defp convert_errors(_), do: nil - defp prepare_replication(%{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state) do + defp prepare_replication( + %{ + backoff: backoff, + conn: conn, + slot_name: slot_name, + publication: publication, + retry_count: retry_count, + check_oid_ref: check_oid_ref + } = state + ) do case Replications.prepare_replication(conn, slot_name) do {:ok, _} -> - send(self(), :poll) - state + oids = Subscriptions.fetch_publication_tables(conn, publication) + if map_size(oids) > 0, do: send(self(), :poll) + + cancel_timer(check_oid_ref) + %{state | oids: oids, check_oid_ref: schedule_check_oids()} {:error, error} -> log_error("PoolingReplicationPreparationError", error) @@ -195,6 +283,8 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do end end + defp schedule_check_oids, do: Process.send_after(self(), :check_oids, @check_oids_interval) + defp record_list_changes_telemetry(time, tenant_id) do Realtime.Telemetry.execute( [:realtime, :replication, :poller, :query, :stop], diff --git a/lib/extensions/postgres_cdc_rls/replications.ex b/lib/extensions/postgres_cdc_rls/replications.ex index 0f7b96667..b36c89bce 100644 --- a/lib/extensions/postgres_cdc_rls/replications.ex +++ b/lib/extensions/postgres_cdc_rls/replications.ex @@ -49,6 +49,20 @@ defmodule Extensions.PostgresCdcRls.Replications do end end + @spec drop_replication_slot(pid(), String.t()) :: + {:ok, :dropped} | {:error, :slot_not_found | Postgrex.Error.t()} + def drop_replication_slot(conn, slot_name) do + case query( + conn, + "select pg_drop_replication_slot(slot_name) from pg_replication_slots where slot_name = $1", + [slot_name] + ) do + {:ok, %Postgrex.Result{num_rows: 0}} -> {:error, :slot_not_found} + {:ok, _} -> {:ok, :dropped} + {:error, error} -> {:error, error} + end + end + @spec get_pg_stat_activity_diff(pid(), integer()) :: {:ok, integer()} | {:error, Postgrex.Error.t()} def get_pg_stat_activity_diff(conn, db_pid) do diff --git a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs index 1f4698918..4cfb7f783 100644 --- a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs +++ b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs @@ -8,6 +8,7 @@ defmodule Realtime.Extensions.CdcRlsTest do setup :set_mimic_global alias Extensions.PostgresCdcRls + alias Extensions.PostgresCdcRls.ReplicationPoller alias Extensions.PostgresCdcRls.Subscriptions alias PostgresCdcRls.SubscriptionManager alias Postgrex @@ -106,6 +107,76 @@ defmodule Realtime.Extensions.CdcRlsTest do assert !Map.equal?(oids2, oids3) end + test "Replication poller toggles slot when publication tables come and go", %{tenant: tenant} do + # setup/0 already received the "ready" event, which fires only after the poller's init/1 + # (and its Registry.register) has run. :sys.get_state below then blocks until the poller + # finishes handle_continue and has its slot prepared. + [{poller_pid, _}] = Registry.lookup(ReplicationPoller.Registry, tenant.external_id) + + # Use the SubscriptionManager pub connection to drive publication state from the test — + # the poller's own conn is owned by the poller process. + {:ok, _manager_pid, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + + %{oids: initial_oids, slot_name: slot_name} = :sys.get_state(poller_pid) + refute initial_oids == %{} + + assert %Postgrex.Result{rows: [[1]]} = + Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name]) + + # Drop the publication: poller should drop its slot and clear oids. + Postgrex.query!(conn, "drop publication if exists supabase_realtime_test", []) + send(poller_pid, :check_oids) + %{oids: oids_after_drop, poll_ref: poll_ref_after_drop} = :sys.get_state(poller_pid) + assert oids_after_drop == %{} + assert poll_ref_after_drop == nil + + assert %Postgrex.Result{rows: [[0]]} = + Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name]) + + # Re-create the publication: poller should recreate the slot and repopulate oids. + Postgrex.query!(conn, "create publication supabase_realtime_test for all tables", []) + send(poller_pid, :check_oids) + %{oids: oids_after_create} = :sys.get_state(poller_pid) + refute oids_after_create == %{} + + assert %Postgrex.Result{rows: [[1]]} = + Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name]) + end + + test "Replication poller toggles slot when tables are removed from the publication", %{tenant: tenant} do + [{poller_pid, _}] = Registry.lookup(ReplicationPoller.Registry, tenant.external_id) + {:ok, _manager_pid, conn} = PostgresCdcRls.get_manager_conn(tenant.external_id) + + %{oids: initial_oids, slot_name: slot_name} = :sys.get_state(poller_pid) + refute initial_oids == %{} + + assert %Postgrex.Result{rows: [[1]]} = + Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name]) + + # Publication still exists but has no tables (recreated without FOR ALL TABLES + # since you can't ALTER ... DROP TABLE on a FOR ALL TABLES publication). + Postgrex.query!(conn, "drop publication if exists supabase_realtime_test", []) + Postgrex.query!(conn, "create publication supabase_realtime_test", []) + + send(poller_pid, :check_oids) + %{oids: oids_after_empty, poll_ref: poll_ref_after_empty} = :sys.get_state(poller_pid) + assert oids_after_empty == %{} + assert poll_ref_after_empty == nil + + assert %Postgrex.Result{rows: [[0]]} = + Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name]) + + # Add a table back to the publication: poller should recreate the slot and repopulate oids. + Postgrex.query!(conn, "alter publication supabase_realtime_test add table public.test", []) + + send(poller_pid, :check_oids) + %{oids: oids_after_add} = :sys.get_state(poller_pid) + refute oids_after_add == %{} + + assert %Postgrex.Result{rows: [[1]]} = + Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name]) + end + test "Stop tenant supervisor", %{tenant: tenant} do sup = Enum.reduce_while(1..10, nil, fn _, acc -> diff --git a/test/realtime/extensions/cdc_rls/replication_poller_test.exs b/test/realtime/extensions/cdc_rls/replication_poller_test.exs index b5ae73535..07ab18739 100644 --- a/test/realtime/extensions/cdc_rls/replication_poller_test.exs +++ b/test/realtime/extensions/cdc_rls/replication_poller_test.exs @@ -7,6 +7,7 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do alias Extensions.PostgresCdcRls.MessageDispatcher alias Extensions.PostgresCdcRls.ReplicationPoller, as: Poller alias Extensions.PostgresCdcRls.Replications + alias Extensions.PostgresCdcRls.Subscriptions alias Realtime.Adapters.Changes.{ DeletedRecord, @@ -53,63 +54,11 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do empty_results = {:ok, %Postgrex.Result{rows: [], num_rows: 0}} stub(Replications, :list_changes, fn _, _, _, _, _ -> empty_results end) - %{args: args, tenant: tenant} - end - - test "handles slot in use error and retries", %{args: args} do - tenant_id = args["id"] - - slot_in_use_error = - {:error, - %Postgrex.Error{ - postgres: %{ - code: :object_in_use, - message: "replication slot is active for PID 12345" - } - }} - - stub(Replications, :get_pg_stat_activity_diff, fn _conn, _pid -> {:error, :pid_not_found} end) - stub(Replications, :terminate_backend, fn _conn, _slot -> {:error, :slot_not_found} end) - - expect(Replications, :list_changes, fn _, _, _, _, _ -> slot_in_use_error end) - - start_link_supervised!({Poller, args}) - - assert_receive { - :telemetry, - [:realtime, :replication, :poller, :query, :stop], - %{duration: _}, - %{tenant: ^tenant_id} - }, - 1000 - end - - test "handles slot in use error with pg_stat_activity returning diff", %{args: args} do - tenant_id = args["id"] + # Default to a publication with tables so the poller actually polls. + # Tests that need an empty publication override this stub explicitly. + stub(Subscriptions, :fetch_publication_tables, fn _, _ -> %{{"public", "test"} => [1234]} end) - slot_in_use_error = - {:error, - %Postgrex.Error{ - postgres: %{ - code: :object_in_use, - message: "replication slot is active for PID 12345" - } - }} - - stub(Replications, :get_pg_stat_activity_diff, fn _conn, _pid -> {:ok, 42} end) - stub(Replications, :terminate_backend, fn _conn, _slot -> {:error, :slot_not_found} end) - - expect(Replications, :list_changes, fn _, _, _, _, _ -> slot_in_use_error end) - - start_link_supervised!({Poller, args}) - - assert_receive { - :telemetry, - [:realtime, :replication, :poller, :query, :stop], - %{duration: _}, - %{tenant: ^tenant_id} - }, - 1000 + %{args: args, tenant: tenant} end test "handles prepare_replication failure and retries", %{args: args} do @@ -143,7 +92,7 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do stub(Replications, :get_pg_stat_activity_diff, fn _conn, _pid -> {:ok, 42} end) stub(Replications, :list_changes, fn _, _, _, _, _ -> slot_in_use_error end) - stub(Replications, :terminate_backend, fn _conn, _slot -> {:ok, :terminated} end) + expect(Replications, :terminate_backend, fn _conn, _slot -> {:ok, :terminated} end) pid = start_link_supervised!({Poller, args}) @@ -391,6 +340,69 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do assert {:ok, %RateCounter{sum: sum}} = RateCounterHelper.tick!(rate) assert sum == 3 end + + test "does not poll WAL when publication has no tables", %{args: args} do + tenant_id = args["id"] + + expect(Subscriptions, :fetch_publication_tables, fn _, _ -> %{} end) + reject(&Replications.list_changes/5) + + start_link_supervised!({Poller, args}) + + refute_receive {:telemetry, [:realtime, :replication, :poller, :query, :stop], _, %{tenant: ^tenant_id}}, 200 + end + + test "drops replication slot and stops polling when tables vanish", %{args: args} do + tenant_id = args["id"] + + expect(Replications, :drop_replication_slot, fn _conn, _slot -> {:ok, :dropped} end) + + pid = start_link_supervised!({Poller, args}) + + # First poll happens with the default non-empty stub. + assert_receive {:telemetry, [:realtime, :replication, :poller, :query, :stop], _, %{tenant: ^tenant_id}}, 500 + + expect(Subscriptions, :fetch_publication_tables, fn _, _ -> %{} end) + reject(&Replications.list_changes/5) + + send(pid, :check_oids) + # Force the GenServer to process :check_oids before we assert. + :sys.get_state(pid) + + refute_receive {:telemetry, [:realtime, :replication, :poller, :query, :stop], _, %{tenant: ^tenant_id}}, 200 + end + + test "resumes polling when tables appear via :check_oids", %{args: args} do + tenant_id = args["id"] + + expect(Subscriptions, :fetch_publication_tables, fn _, _ -> %{} end) + + pid = start_link_supervised!({Poller, args}) + + refute_receive {:telemetry, [:realtime, :replication, :poller, :query, :stop], _, %{tenant: ^tenant_id}}, 200 + + # Tables are added to the publication. Next :check_oids should trigger + # prepare_replication + an initial poll. + expect(Subscriptions, :fetch_publication_tables, fn _, _ -> %{{"public", "test"} => [1234]} end) + send(pid, :check_oids) + + assert_receive {:telemetry, [:realtime, :replication, :poller, :query, :stop], _, %{tenant: ^tenant_id}}, 1000 + end + + test "shuts down when slot drop fails so the temp slot is released with the connection", + %{args: args} do + pid = start_supervised!({Poller, args}, restart: :temporary) + + assert_receive {:telemetry, [:realtime, :replication, :poller, :query, :stop], _, _}, 500 + + expect(Subscriptions, :fetch_publication_tables, fn _, _ -> %{} end) + expect(Replications, :drop_replication_slot, fn _, _ -> {:error, :boom} end) + + ref = Process.monitor(pid) + send(pid, :check_oids) + + assert_receive {:DOWN, ^ref, :process, ^pid, {:shutdown, :drop_replication_slot_failed}}, 1000 + end end @columns [ @@ -778,7 +790,7 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do end test "stops cleanly when database connection fails", %{args: args} do - stub(Realtime.Database, :connect_db, fn _settings -> {:error, :econnrefused} end) + expect(Realtime.Database, :connect_db, fn _settings -> {:error, :econnrefused} end) pid = start_supervised!({Poller, args}, restart: :temporary) ref = Process.monitor(pid) diff --git a/test/realtime/extensions/cdc_rls/replications_test.exs b/test/realtime/extensions/cdc_rls/replications_test.exs index bd6c68491..a442db853 100644 --- a/test/realtime/extensions/cdc_rls/replications_test.exs +++ b/test/realtime/extensions/cdc_rls/replications_test.exs @@ -65,6 +65,25 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationsTest do end end + describe "drop_replication_slot/2" do + test "returns slot_not_found when slot does not exist", %{conn: conn} do + assert {:error, :slot_not_found} = + Replications.drop_replication_slot(conn, "nonexistent_slot_#{:rand.uniform(999_999)}") + end + + test "drops an existing inactive slot", %{conn: conn} do + slot_name = "test_drop_slot_#{:rand.uniform(999_999)}" + Postgrex.query!(conn, "SELECT pg_create_logical_replication_slot($1, 'wal2json')", [slot_name]) + + assert {:ok, :dropped} = Replications.drop_replication_slot(conn, slot_name) + + %{rows: [[count]]} = + Postgrex.query!(conn, "SELECT count(*)::int FROM pg_replication_slots WHERE slot_name = $1", [slot_name]) + + assert count == 0 + end + end + describe "prepare_replication/2" do test "creates a replication slot when it does not exist", %{conn: conn} do slot_name = "test_prep_slot_#{:rand.uniform(999_999)}"