diff --git a/example.exs b/example.exs index de4bb14..cc3ebd2 100644 --- a/example.exs +++ b/example.exs @@ -11,3 +11,14 @@ DynamicSupervisor.start_child( MainApp.DynamicSupervisor, {Publishers.File, ["/tmp/replication.log"]} ) + +DynamicSupervisor.start_child( + MainApp.DynamicSupervisor, + {Publishers.Postgres, + connection: [ + host: "localhost", + username: "postgres", + database: "postgres", + password: "postgres" + ]} +) diff --git a/lib/publishers/postgres/connection.ex b/lib/publishers/postgres/connection.ex new file mode 100644 index 0000000..b8d27a2 --- /dev/null +++ b/lib/publishers/postgres/connection.ex @@ -0,0 +1,52 @@ +defmodule Publishers.Postgres.Connection do + @moduledoc """ + Simple Postgres connection for executing queries. + """ + require Logger + @behaviour Postgrex.SimpleConnection + + def child_spec(args) do + %{ + id: Publishers.Postgres.Connection, + start: {Publishers.Postgres.Connection, :start_link, [args]} + } + end + + def start_link(opts) do + # Automatically reconnect if we lose connection. + extra_opts = [ + auto_reconnect: true + ] + + Postgrex.SimpleConnection.start_link(__MODULE__, :ok, extra_opts ++ opts) + end + + @impl Postgrex.SimpleConnection + def init(:ok) do + {:ok, %{from: nil}} + end + + @impl Postgrex.SimpleConnection + def handle_call({:query, query}, from, state) do + {:query, query, %{state | from: from}} + end + + @impl Postgrex.SimpleConnection + def handle_result(results, state) when is_list(results) do + Postgrex.SimpleConnection.reply(state.from, results) + + {:noreply, state} + end + + @impl Postgrex.SimpleConnection + def handle_result(%Postgrex.Error{} = error, state) do + Postgrex.SimpleConnection.reply(state.from, error) + + {:noreply, state} + end + + @impl Postgrex.SimpleConnection + def notify(_, _, _) do + Logger.warning("Function `notify` not implemented for Postgres connection.") + end +end diff --git a/lib/publishers/postgres/publisher.ex b/lib/publishers/postgres/publisher.ex new file mode 100644 index 0000000..a474e84 --- /dev/null +++ b/lib/publishers/postgres/publisher.ex @@ -0,0 +1,35 @@ +defmodule Publishers.Postgres.Publisher do + use GenServer + + require Logger + + @behaviour Core.PublisherContract + + def start_link(init_args) do + GenServer.start_link(__MODULE__, init_args) + end + + @impl GenServer + def init(init_args) do + Registry.register( + PublisherRegistry, + :publishers, + {Publishers.Postgres.Publisher, :handle_message} + ) + + {:ok, init_args} + end + + @impl Core.PublisherContract + def handle_message(server_pid, message) do + Logger.debug("Postgres publisher handling message") + Logger.debug(message: message) + GenServer.cast(server_pid, {:replication_message, message}) + end + + @impl GenServer + def handle_cast({:replication_message, message}, state) do + Logger.info("Sending message to posgres", message: message) + {:noreply, state} + end +end diff --git a/lib/publishers/postgres/supervisor.ex b/lib/publishers/postgres/supervisor.ex new file mode 100644 index 0000000..477628d --- /dev/null +++ b/lib/publishers/postgres/supervisor.ex @@ -0,0 +1,21 @@ +defmodule Publishers.Postgres do + @moduledoc """ + Entry point for Postgres publisher. + """ + use Boundary, deps: [Core] + use Supervisor + + def start_link(init_args) do + Supervisor.start_link(__MODULE__, init_args) + end + + @impl Supervisor + def init(connection: con_config) do + children = [ + {Publishers.Postgres.Publisher, %{}}, + {Publishers.Postgres.Connection, con_config} + ] + + Supervisor.init(children, strategy: :one_for_one) + end +end