Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions example.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,14 @@ DynamicSupervisor.start_child(
MainApp.DynamicSupervisor,
{Publishers.File, ["/tmp/replication.log"]}
)

DynamicSupervisor.start_child(
MainApp.DynamicSupervisor,
{Publishers.Postgres,
connection: [
host: "localhost",
username: "postgres",
database: "postgres",
password: "postgres"
]}
)
52 changes: 52 additions & 0 deletions lib/publishers/postgres/connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Publishers.Postgres.Connection do
@moduledoc """
Simple Postgres connection for executing queries.
"""
require Logger
@behaviour Postgrex.SimpleConnection

def child_spec(args) do
%{
id: Publishers.Postgres.Connection,
start: {Publishers.Postgres.Connection, :start_link, [args]}
}
end

def start_link(opts) do
# Automatically reconnect if we lose connection.
extra_opts = [
auto_reconnect: true
]

Postgrex.SimpleConnection.start_link(__MODULE__, :ok, extra_opts ++ opts)
end

@impl Postgrex.SimpleConnection
def init(:ok) do
{:ok, %{from: nil}}
end

@impl Postgrex.SimpleConnection
def handle_call({:query, query}, from, state) do
{:query, query, %{state | from: from}}
end

@impl Postgrex.SimpleConnection
def handle_result(results, state) when is_list(results) do
Postgrex.SimpleConnection.reply(state.from, results)

{:noreply, state}
end

@impl Postgrex.SimpleConnection
def handle_result(%Postgrex.Error{} = error, state) do
Postgrex.SimpleConnection.reply(state.from, error)

{:noreply, state}
end

@impl Postgrex.SimpleConnection
def notify(_, _, _) do
Logger.warning("Function `notify` not implemented for Postgres connection.")
end
end
35 changes: 35 additions & 0 deletions lib/publishers/postgres/publisher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
defmodule Publishers.Postgres.Publisher do
use GenServer

require Logger

@behaviour Core.PublisherContract

def start_link(init_args) do
GenServer.start_link(__MODULE__, init_args)
end

@impl GenServer
def init(init_args) do
Registry.register(
PublisherRegistry,
:publishers,
{Publishers.Postgres.Publisher, :handle_message}
)

{:ok, init_args}
end

@impl Core.PublisherContract
def handle_message(server_pid, message) do
Logger.debug("Postgres publisher handling message")
Logger.debug(message: message)
GenServer.cast(server_pid, {:replication_message, message})
end

@impl GenServer
def handle_cast({:replication_message, message}, state) do
Logger.info("Sending message to posgres", message: message)
{:noreply, state}
end
end
21 changes: 21 additions & 0 deletions lib/publishers/postgres/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Publishers.Postgres do
@moduledoc """
Entry point for Postgres publisher.
"""
use Boundary, deps: [Core]
use Supervisor

def start_link(init_args) do
Supervisor.start_link(__MODULE__, init_args)
end

@impl Supervisor
def init(connection: con_config) do
children = [
{Publishers.Postgres.Publisher, %{}},
{Publishers.Postgres.Connection, con_config}
]

Supervisor.init(children, strategy: :one_for_one)
end
end