Skip to content
128 changes: 128 additions & 0 deletions lib/util/loader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
defmodule Util.Loader do
alias __MODULE__.Task
alias __MODULE__.Results

def load(definitions, opts \\ []) do
global_timeout = Keyword.get(opts, :whole_operation_timeout, :infinity)
per_resource_timeout = Keyword.get(opts, :per_resource_timeout, :infinity)

Wormhole.capture(fn ->
tasks = Enum.map(definitions, fn definition ->
{id, fun, opts} = definition

Task.new(id, fun, opts)
end)

with(
:ok <- check_unknown_deps(tasks),
:ok <- check_deps_cycle(tasks)
) do
execute(tasks, Results.new())
end
end, timeout: global_timeout)
|> case do
{:ok, res} -> res
e -> e
end
end

defp execute(tasks, results) do
{runnable, rest} = find_runnable(tasks, already_executed: Results.executed_task_ids(results))

if runnable == [] && rest != [] do
{:error, :unprocessed, Enum.map(rest, fn r -> r.id end)}
else
new_results =
try do
runnable
|> Enum.map(fn t -> Task.execute_async(t, Results.fetch(results, t.deps)) end)
|> Task.await_many()
|> Enum.into(%{})
rescue
e ->
IO.inspect("AAAAAAAAAAAAAAAAA", label: "\n\e[33m=== DEBUG (#{__ENV__.module}:#{__ENV__.line}) ===\e[0m\n")
IO.inspect(e, label: "\n\e[33m=== DEBUG (#{__ENV__.module}:#{__ENV__.line}) ===\e[0m\n")
%{}
end

case process(new_results) do
{:ok, new_results} ->
results = Map.merge(results, new_results)

if rest == [] do
{:ok, results}
else
execute(rest, results)
end

{:error, new_results} ->
results = Map.merge(results, new_results)

{:error, results}
end
end
end

defp process(raw_results) do
res = Enum.reduce(raw_results, {:ok, []}, fn r, {type, acc} ->
case r do
{name, {:ok, val}} -> {type, acc ++ [{name, val}]}
{name, e} -> {:error, acc ++ [{name, e}]}
end
end)

{elem(res, 0), Enum.into(elem(res, 1), %{})}
end

defp check_deps_cycle(tasks, visited \\ []) do
{visitable, rest} = find_runnable(tasks, already_executed: visited)

cond do
visitable == [] && rest == [] ->
:ok

visitable == [] && rest != [] ->
{:error, :dependency_cycle}

true ->
check_deps_cycle(rest, visited ++ Enum.map(visitable, &(&1.id)))
end
end

defp check_unknown_deps(tasks) do
names = Enum.map(tasks, &(&1.id))

tasks
|> Enum.map(fn task ->
unknown_deps = Enum.filter(task.deps, fn d -> d not in names end)

{task.id, unknown_deps}
end)
|> Enum.filter(fn {id, unknown_deps} ->
unknown_deps != []
end)
|> case do
[] -> :ok
e -> {:error, :unknown_dependency, Enum.into(e, %{})}
end
end

defp subset?(arr1, arr2) do
MapSet.subset?(MapSet.new(arr1), MapSet.new(arr2))
end

defp find_runnable(tasks, already_executed: ids) do
Enum.split_with(tasks, fn t -> subset?(t.deps, ids) end)
end


defmodule Results do
def new(), do: %{}
def executed_task_ids(r), do: Map.keys(r)

def fetch(r, ids) do
Enum.filter(r, fn {k, v} -> k in ids end) |> Enum.into(%{})
end
end

end
42 changes: 42 additions & 0 deletions lib/util/loader/task.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Util.Loader.Task do
@moduledoc """
An individual loading task.
"""

defstruct [
:id,
:fun,
:deps,
:timeout
]

def new(id, fun, opts) do
timeout = Keyword.get(opts, :timeout, :infinity)
deps = Keyword.get(opts, :depends_on, [])

{:ok, struct(__MODULE__,
id: id,
fun: fun,
deps: deps,
timeout: timeout
)}
end

def execute(task, deps) do
Wormhole.capture(fn -> dispatch_call(task.fun, deps) end, [timeout: task.timeout])
|> case do
{:ok, {:ok, res}} -> {:ok, res}
{:ok, {:error, err}} -> {:error, err}
{:ok, other} -> {:error, :unexpected_result_type, other}
{:error, err} -> {:error, err}
end
end

defp dispatch_call(fun, deps) do
case :erlang.fun_info(fun)[:arity] do
0 -> fun.()
1 -> fun.(deps)
2 -> fun.(deps, [])
end
end
end
52 changes: 52 additions & 0 deletions test/util/loader/task_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Util.Loader.TaskTest do
use ExUnit.Case, async: true

test "construction" do
assert {:ok, _} = Util.Loader.Task.new(:a, fn -> {:ok, "A"} end, depends_on: [:b, :d])
end

describe "execution" do
test "ok result" do
assert {:ok, task} = Util.Loader.Task.new(:a, fn -> {:ok, "A"} end, [])
assert {:ok, "A"} = Util.Loader.Task.execute(task, %{})
end

test "error result" do
assert {:ok, task} = Util.Loader.Task.new(:a, fn -> {:error, "B"} end, [])
assert {:error, "B"} = Util.Loader.Task.execute(task, %{})
end

test "exception result" do
fun = fn -> raise "AAA" end

assert {:ok, task} = Util.Loader.Task.new(:a, fun, [])
assert {:error, {:shutdown, %RuntimeError{message: "AAA"}}} = Util.Loader.Task.execute(task, %{})
end

test "non-tuple result" do
assert {:ok, task} = Util.Loader.Task.new(:a, fn -> "A" end, [])
assert {:error, :unexpected_result_type, "A"} = Util.Loader.Task.execute(task, %{})
end

test "timeout result" do
fun = fn ->
:timer.sleep(1000)
{:ok, "A"}
end

assert {:ok, task} = Util.Loader.Task.new(:a, fun, timeout: 100)
assert {:error, {:timeout, 100}} = Util.Loader.Task.execute(task, [])
end

test "with deps" do
deps = %{a: "A", b: "B"}

fun = fn deps ->
{:ok, deps.a <> deps.b}
end

assert {:ok, task} = Util.Loader.Task.new(:a, fun, timeout: 100)
assert {:ok, "AB"} = Util.Loader.Task.execute(task, deps)
end
end
end
122 changes: 122 additions & 0 deletions test/util/loader_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
defmodule Util.LoaderTest do
use ExUnit.Case, async: true

alias Util.Loader

test "empty loaders return :ok" do
assert {:ok, %{}} = Loader.load([])
end

test "it can laod things in parallel" do
assert {:ok, resources} = Loader.load([
{:user, fn -> {:ok, "Mike"} end},
{:org, fn -> {:ok, "Acme"} end}
])

assert resources.user == "Mike"
assert resources.org == "Acme"
end

test "it can wait on dependencies" do
assert {:ok, resources} = Loader.load([
{:user, fn -> {:ok, "Mike"} end},
{:permissions, fn deps -> {:ok, "#{deps.user} is an admin"} end, depends_on: [:user]}
])

assert resources.user == "Mike"
assert resources.permissions == "Mike is an admin"
end

test "multiple tasks with dependencies" do
assert {:ok, resources} = Loader.load([
{:a, fn -> {:ok, "a"} end},
{:b, fn deps -> {:ok, deps.a <> "b"} end, depends_on: [:a]},
{:c, fn deps -> {:ok, deps.b <> "c"} end, depends_on: [:b]},
])

assert resources.a == "a"
assert resources.b == "ab"
assert resources.c == "abc"
end

test "it can return errors" do
assert {:error, resources} = Loader.load([
{:user, fn -> {:ok, "Mike"} end},
{:org, fn -> {:error, :not_found} end}
])

assert resources.org == {:error, :not_found}
end

test "it returns an error if an unknown dependency is required" do
resources = [
{:a, fn -> {:ok, nil} end},
{:b, fn -> {:ok, nil} end, depends_on: [:c]},
]

assert {:error, :unknown_dependency, %{b: [:c]}} = Loader.load(resources)
end

test "it returns an error if there is a cycle in the deps" do
resources = [
{:a, fn -> {:ok, nil} end},
{:b, fn -> {:ok, nil} end, depends_on: [:c]},
{:c, fn -> {:ok, nil} end, depends_on: [:b]},
]

assert {:error, :dependency_cycle} = Loader.load(resources)

resources = [
{:a, fn -> {:ok, nil} end},
{:b, fn -> {:ok, nil} end, depends_on: [:d]},
{:c, fn -> {:ok, nil} end, depends_on: [:b]},
{:d, fn -> {:ok, nil} end, depends_on: [:c]},
]

assert {:error, :dependency_cycle} = Loader.load(resources)
end

test "it handles raised exceptions" do
assert {:error, resources} = Loader.load([
{:a, fn -> raise "failure" end},
{:b, fn -> {:ok, nil} end, depends_on: [:a]},
])

assert resources.a == {:error, {:shutdown, %RuntimeError{message: "failure"}}}
end

test "it respects the timeout" do
assert {:error, {:timeout, 100}} = Loader.load([
{:a, fn -> :timer.sleep(300) end},
{:b, fn -> {:ok, nil} end, depends_on: [:a]},
], whole_operation_timeout: 100)
end

test "it respects per task timeout" do
assert {:error, resources} = Loader.load([
{:a, fn -> :timer.sleep(300) end, timeout: 100},
{:b, fn -> {:ok, nil} end, depends_on: [:a]},
])

assert resources.a == {:error, {:timeout, 100}}
end

test "it respects per task timeout defined on global level" do
assert {:error, resources} = Loader.load([
{:a, fn -> :timer.sleep(300) end},
{:b, fn -> {:ok, nil} end, depends_on: [:a]},
], per_resource_timeout: 100)

assert resources.a == {:error, {:timeout, 100}}
end

test "it support fail-fast" do
assert {:error, resources} = Loader.load([
{:a, fn -> :timer.sleep(10000) end},
{:b, fn -> raise "aaa" end}
])

assert resources.b == {:error, {:shutdown, %RuntimeError{message: "aaa"}}}
assert resources.a == "AAA"
end
end