diff --git a/lib/util/loader.ex b/lib/util/loader.ex new file mode 100644 index 0000000..f2ccdea --- /dev/null +++ b/lib/util/loader.ex @@ -0,0 +1,128 @@ +defmodule Util.Loader do + alias __MODULE__.Task + alias __MODULE__.Results + + def load(definitions, opts \\ []) do + global_timeout = Keyword.get(opts, :whole_operation_timeout, :infinity) + per_resource_timeout = Keyword.get(opts, :per_resource_timeout, :infinity) + + Wormhole.capture(fn -> + tasks = Enum.map(definitions, fn definition -> + {id, fun, opts} = definition + + Task.new(id, fun, opts) + end) + + with( + :ok <- check_unknown_deps(tasks), + :ok <- check_deps_cycle(tasks) + ) do + execute(tasks, Results.new()) + end + end, timeout: global_timeout) + |> case do + {:ok, res} -> res + e -> e + end + end + + defp execute(tasks, results) do + {runnable, rest} = find_runnable(tasks, already_executed: Results.executed_task_ids(results)) + + if runnable == [] && rest != [] do + {:error, :unprocessed, Enum.map(rest, fn r -> r.id end)} + else + new_results = + try do + runnable + |> Enum.map(fn t -> Task.execute_async(t, Results.fetch(results, t.deps)) end) + |> Task.await_many() + |> Enum.into(%{}) + rescue + e -> + IO.inspect("AAAAAAAAAAAAAAAAA", label: "\n\e[33m=== DEBUG (#{__ENV__.module}:#{__ENV__.line}) ===\e[0m\n") + IO.inspect(e, label: "\n\e[33m=== DEBUG (#{__ENV__.module}:#{__ENV__.line}) ===\e[0m\n") + %{} + end + + case process(new_results) do + {:ok, new_results} -> + results = Map.merge(results, new_results) + + if rest == [] do + {:ok, results} + else + execute(rest, results) + end + + {:error, new_results} -> + results = Map.merge(results, new_results) + + {:error, results} + end + end + end + + defp process(raw_results) do + res = Enum.reduce(raw_results, {:ok, []}, fn r, {type, acc} -> + case r do + {name, {:ok, val}} -> {type, acc ++ [{name, val}]} + {name, e} -> {:error, acc ++ [{name, e}]} + end + end) + + {elem(res, 0), Enum.into(elem(res, 1), %{})} + end + + defp check_deps_cycle(tasks, visited \\ []) do + {visitable, rest} = find_runnable(tasks, already_executed: visited) + + cond do + visitable == [] && rest == [] -> + :ok + + visitable == [] && rest != [] -> + {:error, :dependency_cycle} + + true -> + check_deps_cycle(rest, visited ++ Enum.map(visitable, &(&1.id))) + end + end + + defp check_unknown_deps(tasks) do + names = Enum.map(tasks, &(&1.id)) + + tasks + |> Enum.map(fn task -> + unknown_deps = Enum.filter(task.deps, fn d -> d not in names end) + + {task.id, unknown_deps} + end) + |> Enum.filter(fn {id, unknown_deps} -> + unknown_deps != [] + end) + |> case do + [] -> :ok + e -> {:error, :unknown_dependency, Enum.into(e, %{})} + end + end + + defp subset?(arr1, arr2) do + MapSet.subset?(MapSet.new(arr1), MapSet.new(arr2)) + end + + defp find_runnable(tasks, already_executed: ids) do + Enum.split_with(tasks, fn t -> subset?(t.deps, ids) end) + end + + + defmodule Results do + def new(), do: %{} + def executed_task_ids(r), do: Map.keys(r) + + def fetch(r, ids) do + Enum.filter(r, fn {k, v} -> k in ids end) |> Enum.into(%{}) + end + end + +end diff --git a/lib/util/loader/task.ex b/lib/util/loader/task.ex new file mode 100644 index 0000000..cfde992 --- /dev/null +++ b/lib/util/loader/task.ex @@ -0,0 +1,42 @@ +defmodule Util.Loader.Task do + @moduledoc """ + An individual loading task. + """ + + defstruct [ + :id, + :fun, + :deps, + :timeout + ] + + def new(id, fun, opts) do + timeout = Keyword.get(opts, :timeout, :infinity) + deps = Keyword.get(opts, :depends_on, []) + + {:ok, struct(__MODULE__, + id: id, + fun: fun, + deps: deps, + timeout: timeout + )} + end + + def execute(task, deps) do + Wormhole.capture(fn -> dispatch_call(task.fun, deps) end, [timeout: task.timeout]) + |> case do + {:ok, {:ok, res}} -> {:ok, res} + {:ok, {:error, err}} -> {:error, err} + {:ok, other} -> {:error, :unexpected_result_type, other} + {:error, err} -> {:error, err} + end + end + + defp dispatch_call(fun, deps) do + case :erlang.fun_info(fun)[:arity] do + 0 -> fun.() + 1 -> fun.(deps) + 2 -> fun.(deps, []) + end + end +end diff --git a/test/util/loader/task_test.exs b/test/util/loader/task_test.exs new file mode 100644 index 0000000..070e510 --- /dev/null +++ b/test/util/loader/task_test.exs @@ -0,0 +1,52 @@ +defmodule Util.Loader.TaskTest do + use ExUnit.Case, async: true + + test "construction" do + assert {:ok, _} = Util.Loader.Task.new(:a, fn -> {:ok, "A"} end, depends_on: [:b, :d]) + end + + describe "execution" do + test "ok result" do + assert {:ok, task} = Util.Loader.Task.new(:a, fn -> {:ok, "A"} end, []) + assert {:ok, "A"} = Util.Loader.Task.execute(task, %{}) + end + + test "error result" do + assert {:ok, task} = Util.Loader.Task.new(:a, fn -> {:error, "B"} end, []) + assert {:error, "B"} = Util.Loader.Task.execute(task, %{}) + end + + test "exception result" do + fun = fn -> raise "AAA" end + + assert {:ok, task} = Util.Loader.Task.new(:a, fun, []) + assert {:error, {:shutdown, %RuntimeError{message: "AAA"}}} = Util.Loader.Task.execute(task, %{}) + end + + test "non-tuple result" do + assert {:ok, task} = Util.Loader.Task.new(:a, fn -> "A" end, []) + assert {:error, :unexpected_result_type, "A"} = Util.Loader.Task.execute(task, %{}) + end + + test "timeout result" do + fun = fn -> + :timer.sleep(1000) + {:ok, "A"} + end + + assert {:ok, task} = Util.Loader.Task.new(:a, fun, timeout: 100) + assert {:error, {:timeout, 100}} = Util.Loader.Task.execute(task, []) + end + + test "with deps" do + deps = %{a: "A", b: "B"} + + fun = fn deps -> + {:ok, deps.a <> deps.b} + end + + assert {:ok, task} = Util.Loader.Task.new(:a, fun, timeout: 100) + assert {:ok, "AB"} = Util.Loader.Task.execute(task, deps) + end + end +end diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs new file mode 100644 index 0000000..50f7b12 --- /dev/null +++ b/test/util/loader_test.exs @@ -0,0 +1,122 @@ +defmodule Util.LoaderTest do + use ExUnit.Case, async: true + + alias Util.Loader + + test "empty loaders return :ok" do + assert {:ok, %{}} = Loader.load([]) + end + + test "it can laod things in parallel" do + assert {:ok, resources} = Loader.load([ + {:user, fn -> {:ok, "Mike"} end}, + {:org, fn -> {:ok, "Acme"} end} + ]) + + assert resources.user == "Mike" + assert resources.org == "Acme" + end + + test "it can wait on dependencies" do + assert {:ok, resources} = Loader.load([ + {:user, fn -> {:ok, "Mike"} end}, + {:permissions, fn deps -> {:ok, "#{deps.user} is an admin"} end, depends_on: [:user]} + ]) + + assert resources.user == "Mike" + assert resources.permissions == "Mike is an admin" + end + + test "multiple tasks with dependencies" do + assert {:ok, resources} = Loader.load([ + {:a, fn -> {:ok, "a"} end}, + {:b, fn deps -> {:ok, deps.a <> "b"} end, depends_on: [:a]}, + {:c, fn deps -> {:ok, deps.b <> "c"} end, depends_on: [:b]}, + ]) + + assert resources.a == "a" + assert resources.b == "ab" + assert resources.c == "abc" + end + + test "it can return errors" do + assert {:error, resources} = Loader.load([ + {:user, fn -> {:ok, "Mike"} end}, + {:org, fn -> {:error, :not_found} end} + ]) + + assert resources.org == {:error, :not_found} + end + + test "it returns an error if an unknown dependency is required" do + resources = [ + {:a, fn -> {:ok, nil} end}, + {:b, fn -> {:ok, nil} end, depends_on: [:c]}, + ] + + assert {:error, :unknown_dependency, %{b: [:c]}} = Loader.load(resources) + end + + test "it returns an error if there is a cycle in the deps" do + resources = [ + {:a, fn -> {:ok, nil} end}, + {:b, fn -> {:ok, nil} end, depends_on: [:c]}, + {:c, fn -> {:ok, nil} end, depends_on: [:b]}, + ] + + assert {:error, :dependency_cycle} = Loader.load(resources) + + resources = [ + {:a, fn -> {:ok, nil} end}, + {:b, fn -> {:ok, nil} end, depends_on: [:d]}, + {:c, fn -> {:ok, nil} end, depends_on: [:b]}, + {:d, fn -> {:ok, nil} end, depends_on: [:c]}, + ] + + assert {:error, :dependency_cycle} = Loader.load(resources) + end + + test "it handles raised exceptions" do + assert {:error, resources} = Loader.load([ + {:a, fn -> raise "failure" end}, + {:b, fn -> {:ok, nil} end, depends_on: [:a]}, + ]) + + assert resources.a == {:error, {:shutdown, %RuntimeError{message: "failure"}}} + end + + test "it respects the timeout" do + assert {:error, {:timeout, 100}} = Loader.load([ + {:a, fn -> :timer.sleep(300) end}, + {:b, fn -> {:ok, nil} end, depends_on: [:a]}, + ], whole_operation_timeout: 100) + end + + test "it respects per task timeout" do + assert {:error, resources} = Loader.load([ + {:a, fn -> :timer.sleep(300) end, timeout: 100}, + {:b, fn -> {:ok, nil} end, depends_on: [:a]}, + ]) + + assert resources.a == {:error, {:timeout, 100}} + end + + test "it respects per task timeout defined on global level" do + assert {:error, resources} = Loader.load([ + {:a, fn -> :timer.sleep(300) end}, + {:b, fn -> {:ok, nil} end, depends_on: [:a]}, + ], per_resource_timeout: 100) + + assert resources.a == {:error, {:timeout, 100}} + end + + test "it support fail-fast" do + assert {:error, resources} = Loader.load([ + {:a, fn -> :timer.sleep(10000) end}, + {:b, fn -> raise "aaa" end} + ]) + + assert resources.b == {:error, {:shutdown, %RuntimeError{message: "aaa"}}} + assert resources.a == "AAA" + end +end