Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 56 additions & 33 deletions lib/extensions/postgres_cdc_rls/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
from
sub_tables
on conflict
-- coalesce needed: NULL != NULL in unique constraints; NULL selected_columns means all columns
(subscription_id, entity, filters, action_filter, coalesce(selected_columns, '{}'))
do update set
claims = excluded.claims,
Expand Down Expand Up @@ -258,55 +259,77 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
@spec parse_subscription_params(map()) :: {:ok, subscription_params} | {:error, binary()}
def parse_subscription_params(params) do
action_filter = action_filter(params)
selected_columns = parse_select(params)

case params do
%{"schema" => schema, "table" => table, "filter" => filter}
when is_binary(schema) and is_binary(table) and is_binary(filter) ->
case parse_filters(filter) do
{:ok, filters} -> {:ok, {action_filter, schema, table, filters, selected_columns}}
{:error, reason} -> {:error, "Error parsing `filter` params: #{reason}"}
end

%{"schema" => schema, "table" => table}
when is_binary(schema) and is_binary(table) and not is_map_key(params, "filter") ->
{:ok, {action_filter, schema, table, [], selected_columns}}
with {:ok, selected_columns} <- parse_select(params) do
case params do
%{"schema" => schema, "table" => table, "filter" => filter}
when is_binary(schema) and is_binary(table) and is_binary(filter) ->
case parse_filters(filter) do
{:ok, filters} ->
case reject_select_on_wildcard(schema, table, selected_columns) do
:ok -> {:ok, {action_filter, schema, table, filters, selected_columns}}
error -> error
end

{:error, reason} ->
{:error, "Error parsing `filter` params: #{reason}"}
end

%{"schema" => schema, "table" => table}
when is_binary(schema) and is_binary(table) and not is_map_key(params, "filter") ->
case reject_select_on_wildcard(schema, table, selected_columns) do
:ok -> {:ok, {action_filter, schema, table, [], selected_columns}}
error -> error
end

%{"schema" => schema}
when is_binary(schema) and not is_map_key(params, "table") and
not is_map_key(params, "filter") ->
{:ok, {action_filter, schema, "*", [], selected_columns}}
%{"schema" => schema}
when is_binary(schema) and not is_map_key(params, "table") and
not is_map_key(params, "filter") ->
case reject_select_on_wildcard(schema, "*", selected_columns) do
:ok -> {:ok, {action_filter, schema, "*", [], selected_columns}}
error -> error
end

%{"table" => table}
when is_binary(table) and not is_map_key(params, "schema") and
not is_map_key(params, "filter") ->
{:ok, {action_filter, "public", table, [], selected_columns}}
%{"table" => table}
when is_binary(table) and not is_map_key(params, "schema") and
not is_map_key(params, "filter") ->
case reject_select_on_wildcard("public", table, selected_columns) do
:ok -> {:ok, {action_filter, "public", table, [], selected_columns}}
error -> error
end

map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
{:error,
"No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: <redacted>"}
map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
{:error,
"No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: <redacted>"}

error ->
{:error,
"No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: #{inspect(error)}"}
error ->
{:error,
"No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: #{inspect(error)}"}
end
end
end

defp parse_select(%{"select" => cols}) when is_list(cols) do
case Enum.filter(cols, &is_binary/1) do
[] -> nil
valid -> valid
[] -> {:ok, nil}
valid -> {:ok, valid}
end
end

defp parse_select(%{"select" => str}) when is_binary(str) do
case str |> String.split(",") |> Enum.reject(&(&1 == "")) do
[] -> nil
cols -> cols
end
{:error, "Error parsing `select` params: expected a list of column name strings, e.g. select: [\"col1\", \"col2\"]"}
end

defp parse_select(_), do: {:ok, nil}

defp reject_select_on_wildcard(_schema, _table, nil), do: :ok

defp reject_select_on_wildcard(schema, table, _selected_columns)
when schema == "*" or table == "*" do
{:error, "Column selection is not supported for wildcard subscriptions. Provide an explicit schema and table name."}
end

defp parse_select(_), do: nil
defp reject_select_on_wildcard(_schema, _table, _selected_columns), do: :ok

defp action_filter(%{"event" => "*"}), do: "*"

Expand Down
56 changes: 27 additions & 29 deletions test/realtime/extensions/cdc_rls/subscriptions_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -619,31 +619,15 @@ defmodule Realtime.Extensions.PostgresCdcRls.SubscriptionsTest do
})
end

test "user can pass a comma-separated string of column names as an alternative syntax" do
assert {:ok, {"*", "public", "messages", [], ["id", "details"]}} =
test "passing a string to select is rejected with a clear error message" do
assert {:error, msg} =
Subscriptions.parse_subscription_params(%{
"schema" => "public",
"table" => "messages",
"select" => "id,details"
})
end

test "consecutive commas in the select string are treated as empty segments and ignored" do
assert {:ok, {"*", "public", "messages", [], ["id", "details"]}} =
Subscriptions.parse_subscription_params(%{
"schema" => "public",
"table" => "messages",
"select" => "id,,details"
})
end

test "column names with surrounding spaces are not trimmed and will be rejected by the database" do
assert {:ok, {"*", "public", "messages", [], ["id", " details"]}} =
Subscriptions.parse_subscription_params(%{
"schema" => "public",
"table" => "messages",
"select" => "id, details"
})
assert msg =~ "`select`"
end

test "non-binary entries in a select list are silently dropped" do
Expand All @@ -655,22 +639,15 @@ defmodule Realtime.Extensions.PostgresCdcRls.SubscriptionsTest do
})
end

test "passing an empty select string is treated as no column selection" do
assert {:ok, {"*", "public", "messages", [], nil}} =
test "passing any string value to select is rejected" do
assert {:error, msg} =
Subscriptions.parse_subscription_params(%{
"schema" => "public",
"table" => "messages",
"select" => ""
})
end

test "passing a bare comma as the select string is treated as no column selection" do
assert {:ok, {"*", "public", "messages", [], nil}} =
Subscriptions.parse_subscription_params(%{
"schema" => "public",
"table" => "messages",
"select" => ","
})
assert msg =~ "`select`"
end

test "user can combine column selection with a row filter" do
Expand Down Expand Up @@ -753,6 +730,27 @@ defmodule Realtime.Extensions.PostgresCdcRls.SubscriptionsTest do

assert msg =~ "invalid column for select nonexistent_column"
end

test "user gets an error when using select with a schema-only (wildcard table) subscription" do
assert {:error, msg} =
Subscriptions.parse_subscription_params(%{
"schema" => "public",
"select" => ["id"]
})

assert msg =~ "wildcard"
end

test "user gets an error when using select with an explicit wildcard table" do
assert {:error, msg} =
Subscriptions.parse_subscription_params(%{
"schema" => "public",
"table" => "*",
"select" => ["id"]
})

assert msg =~ "wildcard"
end
end

defp create_subscriptions(conn, num) do
Expand Down
Loading