From 3520cc963203afa1c3aec24f1bd4f2cce4ed1e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Wed, 12 Oct 2022 18:23:52 +0000 Subject: [PATCH 01/15] Bootstrap parallel loader --- lib/util/loader.ex | 20 ++++++++++++++++++++ test/util/loader_test.exs | 24 ++++++++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 lib/util/loader.ex create mode 100644 test/util/loader_test.exs diff --git a/lib/util/loader.ex b/lib/util/loader.ex new file mode 100644 index 0000000..1b38730 --- /dev/null +++ b/lib/util/loader.ex @@ -0,0 +1,20 @@ +defmodule Util.Loader do + def load(tasks) do + with :ok <- validate(tasks) do + {:ok, Enum.map(tasks, fn {name, fun} -> + Task.async(fn -> + case fun.([], []) do + {:ok, res} -> {name, res} + e -> {:error, e} + end + end) + end) + |> Task.await_many() + |> Enum.into(%{})} + end + end + + defp validate(tasks) do + :ok + end +end diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs new file mode 100644 index 0000000..a8502b7 --- /dev/null +++ b/test/util/loader_test.exs @@ -0,0 +1,24 @@ +defmodule Util.LoaderTest do + use ExUnit.Case, async: true + + alias Util.Loader + + test "it can laod things in parallel" do + assert {:ok, results} = __MODULE__.Example1.load_resources() + + assert results.user == "Mike" + assert results.org == "Acme" + end + + defmodule Example1 do + def load_resources do + Loader.load([ + {:user, &load_user/2}, + {:org, &load_org/2} + ]) + end + + defp load_user(_deps, _args), do: {:ok, "Mike"} + defp load_org(_deps, _args), do: {:ok, "Acme"} + end +end From 191183b7e8470d32d013b9d14551cdbb753dfa0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Wed, 12 Oct 2022 18:58:40 +0000 Subject: [PATCH 02/15] Process as a DAG --- lib/util/loader.ex | 60 +++++++++++++++++++++++++++++++++------ test/util/loader_test.exs | 19 +++++++++++++ 2 files changed, 70 insertions(+), 9 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index 1b38730..2aad7be 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -1,19 +1,61 @@ defmodule Util.Loader do def load(tasks) do with :ok <- validate(tasks) do - {:ok, Enum.map(tasks, fn {name, fun} -> - Task.async(fn -> - case fun.([], []) do - {:ok, res} -> {name, res} - e -> {:error, e} - end - end) + execute(tasks, %{}) + end + end + + defp execute(tasks, results) do + {runnable, rest} = Enum.split_with(tasks, fn t -> + required_deps = deps(t) + resolved_deps = MapSet.new(Map.keys(results)) + + MapSet.subset?(required_deps, resolved_deps) + end) + + new_results = Enum.map(runnable, fn t -> + deps = extract_deps(t, results) + + Task.async(fn -> + case fun(t).(deps, []) do + {:ok, res} -> {name(t), res} + e -> {:error, e} + end end) - |> Task.await_many() - |> Enum.into(%{})} + end) + |> Task.await_many() + |> Enum.into(%{}) + + results = Map.merge(results, new_results) + + if rest == [] do + {:ok, results} + else + execute(rest, results) end end + defp name(task) do + elem(task, 0) + end + + defp fun(task) do + elem(task, 1) + end + + defp deps(task) do + case task do + {name, fun} -> MapSet.new([]) + {name, fun, opts} -> MapSet.new(Keyword.get(opts, :depends_on, [])) + end + end + + defp extract_deps(task, results) do + deps(task) |> Enum.map(fn d -> + {d, Map.get(results, d)} + end) |> Enum.into(%{}) + end + defp validate(tasks) do :ok end diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs index a8502b7..2beb921 100644 --- a/test/util/loader_test.exs +++ b/test/util/loader_test.exs @@ -10,6 +10,13 @@ defmodule Util.LoaderTest do assert results.org == "Acme" end + test "it can wait on dependencies" do + assert {:ok, results} = __MODULE__.Example2.load_resources() + + assert results.user == "Mike" + assert results.permissions == "Mike is an admin" + end + defmodule Example1 do def load_resources do Loader.load([ @@ -21,4 +28,16 @@ defmodule Util.LoaderTest do defp load_user(_deps, _args), do: {:ok, "Mike"} defp load_org(_deps, _args), do: {:ok, "Acme"} end + + defmodule Example2 do + def load_resources do + Loader.load([ + {:user, &load_user/2}, + {:permissions, &load_permissions/2, depends_on: [:user]} + ]) + end + + defp load_user(_deps, _args), do: {:ok, "Mike"} + defp load_permissions(%{user: user}, _args), do: {:ok, "#{user} is an admin"} + end end From 99ca68f4d938ebecf68fda308ba2dbf255d457a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Wed, 12 Oct 2022 20:11:23 +0000 Subject: [PATCH 03/15] Return errors --- lib/util/loader.ex | 34 +++++++++++++++++++++++++--------- test/util/loader_test.exs | 18 ++++++++++++++++++ 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index 2aad7be..a7d831b 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -17,24 +17,40 @@ defmodule Util.Loader do deps = extract_deps(t, results) Task.async(fn -> - case fun(t).(deps, []) do - {:ok, res} -> {name(t), res} - e -> {:error, e} - end + {name(t), fun(t).(deps, [])} end) end) |> Task.await_many() |> Enum.into(%{}) - results = Map.merge(results, new_results) + case process(new_results) do + {:ok, new_results} -> + results = Map.merge(results, new_results) + + if rest == [] do + {:ok, results} + else + execute(rest, results) + end + + {:error, new_results} -> + results = Map.merge(results, new_results) - if rest == [] do - {:ok, results} - else - execute(rest, results) + {:error, results} end end + defp process(raw_results) do + res = Enum.reduce(raw_results, {:ok, []}, fn r, {type, acc} -> + case r do + {name, {:ok, val}} -> {type, acc ++ [{name, val}]} + {name, e} -> {:error, acc ++ [{name, e}]} + end + end) + + {elem(res, 0), Enum.into(elem(res, 1), %{})} + end + defp name(task) do elem(task, 0) end diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs index 2beb921..9703b3a 100644 --- a/test/util/loader_test.exs +++ b/test/util/loader_test.exs @@ -17,6 +17,12 @@ defmodule Util.LoaderTest do assert results.permissions == "Mike is an admin" end + test "it returns errors" do + assert {:error, results} = __MODULE__.Example3.load_resources() + + assert results.user == {:error, :not_found} + end + defmodule Example1 do def load_resources do Loader.load([ @@ -40,4 +46,16 @@ defmodule Util.LoaderTest do defp load_user(_deps, _args), do: {:ok, "Mike"} defp load_permissions(%{user: user}, _args), do: {:ok, "#{user} is an admin"} end + + defmodule Example3 do + def load_resources do + Loader.load([ + {:user, &load_user/2}, + {:org, &load_org/2} + ]) + end + + defp load_user(_deps, _args), do: {:error, :not_found} + defp load_org(_deps, _args), do: {:ok, "Acme"} + end end From 2b9a5d9d7ada605d6f003ee9ab3c3fb347e09959 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Wed, 12 Oct 2022 20:22:45 +0000 Subject: [PATCH 04/15] Return error if deps is missing --- lib/util/loader.ex | 17 ++++++++++++++++- test/util/loader_test.exs | 11 ++++++++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index a7d831b..579724b 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -73,6 +73,21 @@ defmodule Util.Loader do end defp validate(tasks) do - :ok + names = Enum.map(tasks, fn t -> name(t) end) + + with :ok <- all_deps_exists(names, tasks) do + :ok + end + end + + defp all_deps_exists(_, []), do: :ok + defp all_deps_exists(names, [task | rest]) do + missing = deps(task) |> Enum.find(fn d -> d not in names end) + + if missing do + {:error, :unknown_dependency, missing} + else + all_deps_exists(names, rest) + end end end diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs index 9703b3a..56864e8 100644 --- a/test/util/loader_test.exs +++ b/test/util/loader_test.exs @@ -17,12 +17,21 @@ defmodule Util.LoaderTest do assert results.permissions == "Mike is an admin" end - test "it returns errors" do + test "it can return errors" do assert {:error, results} = __MODULE__.Example3.load_resources() assert results.user == {:error, :not_found} end + test "it returns an error if an unknown dependency is required" do + resources = [ + {:a, fn _, _ -> {:ok, nil} end}, + {:b, fn _, _ -> {:ok, nil} end, depends_on: [:c]}, + ] + + assert {:error, :unknown_dependency, :c} = Loader.load(resources) + end + defmodule Example1 do def load_resources do Loader.load([ From 671d86e14167539edc558508f319dd64336475d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Wed, 12 Oct 2022 20:51:24 +0000 Subject: [PATCH 05/15] Add example of linear deps --- lib/util/loader.ex | 63 ++++++++++++++------------------------- test/util/loader_test.exs | 29 +++++++++++++++++- 2 files changed, 51 insertions(+), 41 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index 579724b..02050fd 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -1,8 +1,6 @@ defmodule Util.Loader do def load(tasks) do - with :ok <- validate(tasks) do - execute(tasks, %{}) - end + execute(tasks, %{}) end defp execute(tasks, results) do @@ -13,30 +11,34 @@ defmodule Util.Loader do MapSet.subset?(required_deps, resolved_deps) end) - new_results = Enum.map(runnable, fn t -> - deps = extract_deps(t, results) + if runnable == [] && rest != [] do + {:error, :unprocessed, Enum.map(rest, fn r -> name(r) end)} + else + new_results = Enum.map(runnable, fn t -> + deps = extract_deps(t, results) - Task.async(fn -> - {name(t), fun(t).(deps, [])} + Task.async(fn -> + {name(t), fun(t).(deps, [])} + end) end) - end) - |> Task.await_many() - |> Enum.into(%{}) + |> Task.await_many() + |> Enum.into(%{}) - case process(new_results) do - {:ok, new_results} -> - results = Map.merge(results, new_results) + case process(new_results) do + {:ok, new_results} -> + results = Map.merge(results, new_results) - if rest == [] do - {:ok, results} - else - execute(rest, results) - end + if rest == [] do + {:ok, results} + else + execute(rest, results) + end - {:error, new_results} -> - results = Map.merge(results, new_results) + {:error, new_results} -> + results = Map.merge(results, new_results) - {:error, results} + {:error, results} + end end end @@ -71,23 +73,4 @@ defmodule Util.Loader do {d, Map.get(results, d)} end) |> Enum.into(%{}) end - - defp validate(tasks) do - names = Enum.map(tasks, fn t -> name(t) end) - - with :ok <- all_deps_exists(names, tasks) do - :ok - end - end - - defp all_deps_exists(_, []), do: :ok - defp all_deps_exists(names, [task | rest]) do - missing = deps(task) |> Enum.find(fn d -> d not in names end) - - if missing do - {:error, :unknown_dependency, missing} - else - all_deps_exists(names, rest) - end - end end diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs index 56864e8..5b449b1 100644 --- a/test/util/loader_test.exs +++ b/test/util/loader_test.exs @@ -15,6 +15,14 @@ defmodule Util.LoaderTest do assert results.user == "Mike" assert results.permissions == "Mike is an admin" + + resources = [ + {:a, fn _, _ -> {:ok, "a"} end}, + {:b, fn _, _ -> {:ok, "b"} end, depends_on: [:a]}, + {:c, fn _, _ -> {:ok, "c"} end, depends_on: [:b]}, + ] + + assert {:ok, %{a: "a", b: "b", c: "c"}} = Loader.load(resources) end test "it can return errors" do @@ -29,7 +37,26 @@ defmodule Util.LoaderTest do {:b, fn _, _ -> {:ok, nil} end, depends_on: [:c]}, ] - assert {:error, :unknown_dependency, :c} = Loader.load(resources) + assert {:error, :unprocessed, [:b]} = Loader.load(resources) + end + + test "it returns an error if there is a cycle in the deps" do + resources = [ + {:a, fn _, _ -> {:ok, nil} end}, + {:b, fn _, _ -> {:ok, nil} end, depends_on: [:c]}, + {:c, fn _, _ -> {:ok, nil} end, depends_on: [:b]}, + ] + + assert {:error, :unprocessed, [:b, :c]} = Loader.load(resources) + + resources = [ + {:a, fn _, _ -> {:ok, nil} end}, + {:b, fn _, _ -> {:ok, nil} end, depends_on: [:d]}, + {:c, fn _, _ -> {:ok, nil} end, depends_on: [:b]}, + {:d, fn _, _ -> {:ok, nil} end, depends_on: [:c]}, + ] + + assert {:error, :unprocessed, [:b, :c, :d]} = Loader.load(resources) end defmodule Example1 do From 3e3a6fdb35b9f2db4ffa5027ff02c45bdd17a623 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Sat, 15 Oct 2022 18:28:24 +0000 Subject: [PATCH 06/15] Add more tests --- lib/util/loader.ex | 23 +++++++++++- test/util/loader_test.exs | 77 +++++++++++++-------------------------- 2 files changed, 48 insertions(+), 52 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index 02050fd..7d6522e 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -1,6 +1,11 @@ defmodule Util.Loader do def load(tasks) do - execute(tasks, %{}) + with( + :ok <- check_deps_cycle(tasks), + :ok <- check_unknown_deps(tasks) + ) do + execute(tasks, %{}) + end end defp execute(tasks, results) do @@ -73,4 +78,20 @@ defmodule Util.Loader do {d, Map.get(results, d)} end) |> Enum.into(%{}) end + + defp check_deps_cycle(tasks) do + :ok + end + + defp check_unknown_deps(tasks) do + names = Enum.map(tasks, fn t -> name(t) end) + + Enum.find(tasks, fn task -> + Enum.any?(deps(task), fn d -> d not in names end) + end) + |> case do + nil -> :ok + _ -> {:error, :unknown_dependency} + end + end end diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs index 5b449b1..0401206 100644 --- a/test/util/loader_test.exs +++ b/test/util/loader_test.exs @@ -4,31 +4,42 @@ defmodule Util.LoaderTest do alias Util.Loader test "it can laod things in parallel" do - assert {:ok, results} = __MODULE__.Example1.load_resources() + assert {:ok, resources} = Loader.load([ + {:user, fn -> {:ok, "Mike"} end}, + {:org, fn -> {:ok, "Acme"} end} + ]) - assert results.user == "Mike" - assert results.org == "Acme" + assert resources.user == "Mike" + assert resources.org == "Acme" end test "it can wait on dependencies" do - assert {:ok, results} = __MODULE__.Example2.load_resources() + assert {:ok, resources} = Loader.load([ + {:user, fn -> {:ok, "Mike"} end}, + {:permissions, fn deps -> {:ok, "#{deps.user} is an admin"} end, depends_on: [:user]} + ]) - assert results.user == "Mike" - assert results.permissions == "Mike is an admin" + assert resources.user == "Mike" + assert resources.permissions == "Mike is an admin" + end - resources = [ - {:a, fn _, _ -> {:ok, "a"} end}, - {:b, fn _, _ -> {:ok, "b"} end, depends_on: [:a]}, - {:c, fn _, _ -> {:ok, "c"} end, depends_on: [:b]}, - ] + test "multiple tasks with dependencies" do + assert {:ok, resources} = Loader.load([ + {:a, fn -> {:ok, "a"} end}, + {:b, fn -> {:ok, "b"} end, depends_on: [:a]}, + {:c, fn -> {:ok, "c"} end, depends_on: [:b]}, + ]) assert {:ok, %{a: "a", b: "b", c: "c"}} = Loader.load(resources) end test "it can return errors" do - assert {:error, results} = __MODULE__.Example3.load_resources() + assert {:error, resources} = Loader.load([ + {:user, fn -> {:ok, "Mike"} end}, + {:org, fn -> {:error, :not_found} end} + ]) - assert results.user == {:error, :not_found} + assert resources.org == {:error, :not_found} end test "it returns an error if an unknown dependency is required" do @@ -37,7 +48,7 @@ defmodule Util.LoaderTest do {:b, fn _, _ -> {:ok, nil} end, depends_on: [:c]}, ] - assert {:error, :unprocessed, [:b]} = Loader.load(resources) + assert {:error, :unknown_dependency, [:c]} = Loader.load(resources) end test "it returns an error if there is a cycle in the deps" do @@ -56,42 +67,6 @@ defmodule Util.LoaderTest do {:d, fn _, _ -> {:ok, nil} end, depends_on: [:c]}, ] - assert {:error, :unprocessed, [:b, :c, :d]} = Loader.load(resources) - end - - defmodule Example1 do - def load_resources do - Loader.load([ - {:user, &load_user/2}, - {:org, &load_org/2} - ]) - end - - defp load_user(_deps, _args), do: {:ok, "Mike"} - defp load_org(_deps, _args), do: {:ok, "Acme"} - end - - defmodule Example2 do - def load_resources do - Loader.load([ - {:user, &load_user/2}, - {:permissions, &load_permissions/2, depends_on: [:user]} - ]) - end - - defp load_user(_deps, _args), do: {:ok, "Mike"} - defp load_permissions(%{user: user}, _args), do: {:ok, "#{user} is an admin"} - end - - defmodule Example3 do - def load_resources do - Loader.load([ - {:user, &load_user/2}, - {:org, &load_org/2} - ]) - end - - defp load_user(_deps, _args), do: {:error, :not_found} - defp load_org(_deps, _args), do: {:ok, "Acme"} + assert {:error, :dependency_cycle} = Loader.load(resources) end end From 3cf48d05de82eebf6f9e33377accd505f8c1f023 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Sat, 15 Oct 2022 19:15:56 +0000 Subject: [PATCH 07/15] Introce helper module for loading tasks --- lib/util/loader.ex | 64 +++++++++++++++++++++++++-------------- test/util/loader_test.exs | 10 +++--- 2 files changed, 47 insertions(+), 27 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index 7d6522e..ee0ca41 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -1,5 +1,9 @@ defmodule Util.Loader do - def load(tasks) do + alias __MODULE__.LoadTask + + def load(definitions) do + tasks = Enum.map(definitions, &LoadTask.new/1) + with( :ok <- check_deps_cycle(tasks), :ok <- check_unknown_deps(tasks) @@ -10,21 +14,16 @@ defmodule Util.Loader do defp execute(tasks, results) do {runnable, rest} = Enum.split_with(tasks, fn t -> - required_deps = deps(t) - resolved_deps = MapSet.new(Map.keys(results)) - - MapSet.subset?(required_deps, resolved_deps) + MapSet.subset?(MapSet.new(t.deps), MapSet.new(Map.keys(results))) end) if runnable == [] && rest != [] do - {:error, :unprocessed, Enum.map(rest, fn r -> name(r) end)} + {:error, :unprocessed, Enum.map(rest, fn r -> r.id end)} else new_results = Enum.map(runnable, fn t -> deps = extract_deps(t, results) - Task.async(fn -> - {name(t), fun(t).(deps, [])} - end) + LoadTask.execute_async(t, deps) end) |> Task.await_many() |> Enum.into(%{}) @@ -62,19 +61,8 @@ defmodule Util.Loader do elem(task, 0) end - defp fun(task) do - elem(task, 1) - end - - defp deps(task) do - case task do - {name, fun} -> MapSet.new([]) - {name, fun, opts} -> MapSet.new(Keyword.get(opts, :depends_on, [])) - end - end - defp extract_deps(task, results) do - deps(task) |> Enum.map(fn d -> + task.deps |> Enum.map(fn d -> {d, Map.get(results, d)} end) |> Enum.into(%{}) end @@ -84,14 +72,44 @@ defmodule Util.Loader do end defp check_unknown_deps(tasks) do - names = Enum.map(tasks, fn t -> name(t) end) + names = Enum.map(tasks, &(&1.id)) Enum.find(tasks, fn task -> - Enum.any?(deps(task), fn d -> d not in names end) + Enum.any?(task.deps, fn d -> d not in names end) end) |> case do nil -> :ok _ -> {:error, :unknown_dependency} end end + + defmodule LoadTask do + def new({id, fun}) do + new({id, fun, []}) + end + + def new({id, fun, opts}) do + %{ + id: id, + fun: fun, + deps: Keyword.get(opts, :depends_on, []) + } + end + + def execute(task, deps) do + case :erlang.fun_info(task.fun)[:arity] do + 0 -> task.fun.() + 1 -> task.fun.(deps) + 2 -> task.fun.(deps, []) + end + end + + def execute_async(task, deps) do + Task.async(fn -> + res = execute(task, deps) + + {task.id, res} + end) + end + end end diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs index 0401206..061bc1b 100644 --- a/test/util/loader_test.exs +++ b/test/util/loader_test.exs @@ -26,11 +26,13 @@ defmodule Util.LoaderTest do test "multiple tasks with dependencies" do assert {:ok, resources} = Loader.load([ {:a, fn -> {:ok, "a"} end}, - {:b, fn -> {:ok, "b"} end, depends_on: [:a]}, - {:c, fn -> {:ok, "c"} end, depends_on: [:b]}, + {:b, fn deps -> {:ok, deps.a <> "b"} end, depends_on: [:a]}, + {:c, fn deps -> {:ok, deps.b <> "c"} end, depends_on: [:b]}, ]) - assert {:ok, %{a: "a", b: "b", c: "c"}} = Loader.load(resources) + assert resources.a == "a" + assert resources.b == "ab" + assert resources.c == "abc" end test "it can return errors" do @@ -58,7 +60,7 @@ defmodule Util.LoaderTest do {:c, fn _, _ -> {:ok, nil} end, depends_on: [:b]}, ] - assert {:error, :unprocessed, [:b, :c]} = Loader.load(resources) + assert {:error, :dependency_cycle} = Loader.load(resources) resources = [ {:a, fn _, _ -> {:ok, nil} end}, From d61806cd3e427485cdd6adbd1b05bc2110b24e67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Sun, 16 Oct 2022 20:25:43 +0000 Subject: [PATCH 08/15] Check unknown deps --- lib/util/loader.ex | 14 ++++++++++---- test/util/loader_test.exs | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index ee0ca41..85c0b9a 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -74,12 +74,18 @@ defmodule Util.Loader do defp check_unknown_deps(tasks) do names = Enum.map(tasks, &(&1.id)) - Enum.find(tasks, fn task -> - Enum.any?(task.deps, fn d -> d not in names end) + tasks + |> Enum.map(fn task -> + unknown_deps = Enum.filter(task.deps, fn d -> d not in names end) + + {task.id, unknown_deps} + end) + |> Enum.filter(fn {id, unknown_deps} -> + unknown_deps != [] end) |> case do - nil -> :ok - _ -> {:error, :unknown_dependency} + [] -> :ok + e -> {:error, :unknown_dependency, Enum.into(e, %{})} end end diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs index 061bc1b..3bdc6f7 100644 --- a/test/util/loader_test.exs +++ b/test/util/loader_test.exs @@ -50,7 +50,7 @@ defmodule Util.LoaderTest do {:b, fn _, _ -> {:ok, nil} end, depends_on: [:c]}, ] - assert {:error, :unknown_dependency, [:c]} = Loader.load(resources) + assert {:error, :unknown_dependency, %{b: [:c]}} = Loader.load(resources) end test "it returns an error if there is a cycle in the deps" do From f5eec184856827c49f8413915a193b82febde3eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Sun, 16 Oct 2022 20:52:15 +0000 Subject: [PATCH 09/15] Check for dependency cycles --- lib/util/loader.ex | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index 85c0b9a..31fff13 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -5,8 +5,8 @@ defmodule Util.Loader do tasks = Enum.map(definitions, &LoadTask.new/1) with( - :ok <- check_deps_cycle(tasks), - :ok <- check_unknown_deps(tasks) + :ok <- check_unknown_deps(tasks), + :ok <- check_deps_cycle(tasks) ) do execute(tasks, %{}) end @@ -67,8 +67,21 @@ defmodule Util.Loader do end) |> Enum.into(%{}) end - defp check_deps_cycle(tasks) do - :ok + defp check_deps_cycle(tasks, visited \\ []) do + {visitable, rest} = Enum.split_with(tasks, fn t -> + MapSet.subset?(MapSet.new(t.deps), MapSet.new(visited)) + end) + + cond do + visitable == [] && rest == [] -> + :ok + + visitable == [] && rest != [] -> + {:error, :dependency_cycle} + + true -> + check_deps_cycle(rest, visited ++ Enum.map(visitable, &(&1.id))) + end end defp check_unknown_deps(tasks) do From 8d18cd0b1bbf7f18d2a752f35ac79da2cc3233dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Sun, 16 Oct 2022 20:53:04 +0000 Subject: [PATCH 10/15] Add test case for empty load group --- test/util/loader_test.exs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs index 3bdc6f7..c86445c 100644 --- a/test/util/loader_test.exs +++ b/test/util/loader_test.exs @@ -3,6 +3,10 @@ defmodule Util.LoaderTest do alias Util.Loader + test "empty loaders return :ok" do + assert {:ok, %{}} = Loader.load([]) + end + test "it can laod things in parallel" do assert {:ok, resources} = Loader.load([ {:user, fn -> {:ok, "Mike"} end}, From 14a6b6120cf0f7372887f230a94f68d4dae28e64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Sun, 16 Oct 2022 21:03:34 +0000 Subject: [PATCH 11/15] Handle exceptions --- lib/util/loader.ex | 29 ++++++++++++++++++----------- test/util/loader_test.exs | 27 ++++++++++++++++++--------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index 31fff13..8d3a6aa 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -115,20 +115,27 @@ defmodule Util.Loader do } end - def execute(task, deps) do - case :erlang.fun_info(task.fun)[:arity] do - 0 -> task.fun.() - 1 -> task.fun.(deps) - 2 -> task.fun.(deps, []) - end - end - def execute_async(task, deps) do Task.async(fn -> - res = execute(task, deps) - - {task.id, res} + execute(task, deps) end) end + + defp execute(task, deps) do + Wormhole.capture(fn -> dispatch_call(task.fun, deps) end) + |> case do + {:ok, res} -> {task.id, res} + e -> {task.id, e} + end + end + + defp dispatch_call(fun, deps) do + case :erlang.fun_info(fun)[:arity] do + 0 -> fun.() + 1 -> fun.(deps) + 2 -> fun.(deps, []) + end + end + end end diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs index c86445c..750b344 100644 --- a/test/util/loader_test.exs +++ b/test/util/loader_test.exs @@ -50,8 +50,8 @@ defmodule Util.LoaderTest do test "it returns an error if an unknown dependency is required" do resources = [ - {:a, fn _, _ -> {:ok, nil} end}, - {:b, fn _, _ -> {:ok, nil} end, depends_on: [:c]}, + {:a, fn -> {:ok, nil} end}, + {:b, fn -> {:ok, nil} end, depends_on: [:c]}, ] assert {:error, :unknown_dependency, %{b: [:c]}} = Loader.load(resources) @@ -59,20 +59,29 @@ defmodule Util.LoaderTest do test "it returns an error if there is a cycle in the deps" do resources = [ - {:a, fn _, _ -> {:ok, nil} end}, - {:b, fn _, _ -> {:ok, nil} end, depends_on: [:c]}, - {:c, fn _, _ -> {:ok, nil} end, depends_on: [:b]}, + {:a, fn -> {:ok, nil} end}, + {:b, fn -> {:ok, nil} end, depends_on: [:c]}, + {:c, fn -> {:ok, nil} end, depends_on: [:b]}, ] assert {:error, :dependency_cycle} = Loader.load(resources) resources = [ - {:a, fn _, _ -> {:ok, nil} end}, - {:b, fn _, _ -> {:ok, nil} end, depends_on: [:d]}, - {:c, fn _, _ -> {:ok, nil} end, depends_on: [:b]}, - {:d, fn _, _ -> {:ok, nil} end, depends_on: [:c]}, + {:a, fn -> {:ok, nil} end}, + {:b, fn -> {:ok, nil} end, depends_on: [:d]}, + {:c, fn -> {:ok, nil} end, depends_on: [:b]}, + {:d, fn -> {:ok, nil} end, depends_on: [:c]}, ] assert {:error, :dependency_cycle} = Loader.load(resources) end + + test "it handles raised exceptions" do + assert {:error, resources} = Loader.load([ + {:a, fn -> raise "failure" end}, + {:b, fn -> {:ok, nil} end, depends_on: [:a]}, + ]) + + assert resources.a == {:error, {:shutdown, %RuntimeError{message: "failure"}}} + end end From 2e2bfae31bce83fa9579911c6941582c46e1dec4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Sun, 16 Oct 2022 21:18:35 +0000 Subject: [PATCH 12/15] Introduce result abstraction --- lib/util/loader.ex | 50 ++++++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index 8d3a6aa..4e08a02 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -1,5 +1,6 @@ defmodule Util.Loader do alias __MODULE__.LoadTask + alias __MODULE__.Results def load(definitions) do tasks = Enum.map(definitions, &LoadTask.new/1) @@ -8,25 +9,21 @@ defmodule Util.Loader do :ok <- check_unknown_deps(tasks), :ok <- check_deps_cycle(tasks) ) do - execute(tasks, %{}) + execute(tasks, Results.new()) end end defp execute(tasks, results) do - {runnable, rest} = Enum.split_with(tasks, fn t -> - MapSet.subset?(MapSet.new(t.deps), MapSet.new(Map.keys(results))) - end) + {runnable, rest} = find_runnable(tasks, already_executed: Results.executed_task_ids(results)) if runnable == [] && rest != [] do {:error, :unprocessed, Enum.map(rest, fn r -> r.id end)} else - new_results = Enum.map(runnable, fn t -> - deps = extract_deps(t, results) - - LoadTask.execute_async(t, deps) - end) - |> Task.await_many() - |> Enum.into(%{}) + new_results = + runnable + |> Enum.map(fn t -> LoadTask.execute_async(t, Results.fetch(results, t.deps)) end) + |> Task.await_many() + |> Enum.into(%{}) case process(new_results) do {:ok, new_results} -> @@ -57,20 +54,8 @@ defmodule Util.Loader do {elem(res, 0), Enum.into(elem(res, 1), %{})} end - defp name(task) do - elem(task, 0) - end - - defp extract_deps(task, results) do - task.deps |> Enum.map(fn d -> - {d, Map.get(results, d)} - end) |> Enum.into(%{}) - end - defp check_deps_cycle(tasks, visited \\ []) do - {visitable, rest} = Enum.split_with(tasks, fn t -> - MapSet.subset?(MapSet.new(t.deps), MapSet.new(visited)) - end) + {visitable, rest} = find_runnable(tasks, already_executed: visited) cond do visitable == [] && rest == [] -> @@ -102,6 +87,14 @@ defmodule Util.Loader do end end + defp subset?(arr1, arr2) do + MapSet.subset?(MapSet.new(arr1), MapSet.new(arr2)) + end + + defp find_runnable(tasks, already_executed: ids) do + Enum.split_with(tasks, fn t -> subset?(t.deps, ids) end) + end + defmodule LoadTask do def new({id, fun}) do new({id, fun, []}) @@ -136,6 +129,15 @@ defmodule Util.Loader do 2 -> fun.(deps, []) end end + end + + defmodule Results do + def new(), do: %{} + def executed_task_ids(r), do: Map.keys(r) + def fetch(r, ids) do + Enum.filter(r, fn {k, v} -> k in ids end) |> Enum.into(%{}) + end end + end From 7bd2a11eff06784ba34042c1fd5065f69365cfa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Mon, 17 Oct 2022 18:57:31 +0000 Subject: [PATCH 13/15] Various timeouts on various levels --- lib/util/loader.ex | 51 +++++++++++++++++++++++++++++---------- test/util/loader_test.exs | 25 +++++++++++++++++++ 2 files changed, 63 insertions(+), 13 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index 4e08a02..30eaa86 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -2,14 +2,25 @@ defmodule Util.Loader do alias __MODULE__.LoadTask alias __MODULE__.Results - def load(definitions) do - tasks = Enum.map(definitions, &LoadTask.new/1) - - with( - :ok <- check_unknown_deps(tasks), - :ok <- check_deps_cycle(tasks) - ) do - execute(tasks, Results.new()) + def load(definitions, opts \\ []) do + global_timeout = Keyword.get(opts, :whole_operation_timeout, :infinity) + per_resource_timeout = Keyword.get(opts, :per_resource_timeout, :infinity) + + Wormhole.capture(fn -> + tasks = Enum.map(definitions, fn definition -> + LoadTask.new(definition, per_resource_timeout: per_resource_timeout) + end) + + with( + :ok <- check_unknown_deps(tasks), + :ok <- check_deps_cycle(tasks) + ) do + execute(tasks, Results.new()) + end + end, timeout: global_timeout) + |> case do + {:ok, res} -> res + e -> e end end @@ -96,15 +107,29 @@ defmodule Util.Loader do end defmodule LoadTask do - def new({id, fun}) do - new({id, fun, []}) + def new({id, fun}, opts) do + new({id, fun, []}, opts) end - def new({id, fun, opts}) do + def new({id, fun, task_opts}, opts) do + per_resource_timeout = Keyword.get(opts, :per_resource_timeout, :infinity) + timeout = Keyword.get(task_opts, :timeout, :infinity) + + # localy defined timeout always takes priority over the ones + # defined on the whole load operation + timeout = + if timeout == :infinity do + per_resource_timeout + else + timeout + end + + %{ id: id, fun: fun, - deps: Keyword.get(opts, :depends_on, []) + deps: Keyword.get(task_opts, :depends_on, []), + timeout: timeout } end @@ -115,7 +140,7 @@ defmodule Util.Loader do end defp execute(task, deps) do - Wormhole.capture(fn -> dispatch_call(task.fun, deps) end) + Wormhole.capture(fn -> dispatch_call(task.fun, deps) end, timeout: task.timeout) |> case do {:ok, res} -> {task.id, res} e -> {task.id, e} diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs index 750b344..820b809 100644 --- a/test/util/loader_test.exs +++ b/test/util/loader_test.exs @@ -84,4 +84,29 @@ defmodule Util.LoaderTest do assert resources.a == {:error, {:shutdown, %RuntimeError{message: "failure"}}} end + + test "it respects the timeout" do + assert {:error, {:timeout, 100}} = Loader.load([ + {:a, fn -> :timer.sleep(300) end}, + {:b, fn -> {:ok, nil} end, depends_on: [:a]}, + ], whole_operation_timeout: 100) + end + + test "it respects per task timeout" do + assert {:error, resources} = Loader.load([ + {:a, fn -> :timer.sleep(300) end, timeout: 100}, + {:b, fn -> {:ok, nil} end, depends_on: [:a]}, + ]) + + assert resources.a == {:error, {:timeout, 100}} + end + + test "it respects per task timeout defined on global level" do + assert {:error, resources} = Loader.load([ + {:a, fn -> :timer.sleep(300) end}, + {:b, fn -> {:ok, nil} end, depends_on: [:a]}, + ], per_resource_timeout: 100) + + assert resources.a == {:error, {:timeout, 100}} + end end From 16f909870191cc71fc522085ebbff15fd7334982 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Mon, 17 Oct 2022 19:11:09 +0000 Subject: [PATCH 14/15] Bootstrap fail-fast strategy --- lib/util/loader.ex | 18 +++++++++++++----- test/util/loader_test.exs | 10 ++++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/lib/util/loader.ex b/lib/util/loader.ex index 30eaa86..44c3734 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -31,10 +31,18 @@ defmodule Util.Loader do {:error, :unprocessed, Enum.map(rest, fn r -> r.id end)} else new_results = - runnable - |> Enum.map(fn t -> LoadTask.execute_async(t, Results.fetch(results, t.deps)) end) - |> Task.await_many() - |> Enum.into(%{}) + try do + runnable + |> Enum.map(fn t -> LoadTask.execute_async(t, Results.fetch(results, t.deps)) end) + |> Task.await_many() + |> Enum.into(%{}) + rescue + e -> + IO.inspect("AAAAAAAAAAAAAAAAA", label: "\n\e[33m=== DEBUG (#{__ENV__.module}:#{__ENV__.line}) ===\e[0m\n") + IO.inspect(e, label: "\n\e[33m=== DEBUG (#{__ENV__.module}:#{__ENV__.line}) ===\e[0m\n") + %{} + end + case process(new_results) do {:ok, new_results} -> @@ -143,7 +151,7 @@ defmodule Util.Loader do Wormhole.capture(fn -> dispatch_call(task.fun, deps) end, timeout: task.timeout) |> case do {:ok, res} -> {task.id, res} - e -> {task.id, e} + e -> raise "fail_fast" end end diff --git a/test/util/loader_test.exs b/test/util/loader_test.exs index 820b809..50f7b12 100644 --- a/test/util/loader_test.exs +++ b/test/util/loader_test.exs @@ -109,4 +109,14 @@ defmodule Util.LoaderTest do assert resources.a == {:error, {:timeout, 100}} end + + test "it support fail-fast" do + assert {:error, resources} = Loader.load([ + {:a, fn -> :timer.sleep(10000) end}, + {:b, fn -> raise "aaa" end} + ]) + + assert resources.b == {:error, {:shutdown, %RuntimeError{message: "aaa"}}} + assert resources.a == "AAA" + end end From 5863547d88817e7049008a048bdaf905ab5a6322 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Sun, 30 Oct 2022 22:24:43 +0000 Subject: [PATCH 15/15] Extract loader task into dedicated module --- lib/util/loader.ex | 58 +++------------------------------- lib/util/loader/task.ex | 42 ++++++++++++++++++++++++ test/util/loader/task_test.exs | 52 ++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 53 deletions(-) create mode 100644 lib/util/loader/task.ex create mode 100644 test/util/loader/task_test.exs diff --git a/lib/util/loader.ex b/lib/util/loader.ex index 44c3734..f2ccdea 100644 --- a/lib/util/loader.ex +++ b/lib/util/loader.ex @@ -1,5 +1,5 @@ defmodule Util.Loader do - alias __MODULE__.LoadTask + alias __MODULE__.Task alias __MODULE__.Results def load(definitions, opts \\ []) do @@ -8,7 +8,9 @@ defmodule Util.Loader do Wormhole.capture(fn -> tasks = Enum.map(definitions, fn definition -> - LoadTask.new(definition, per_resource_timeout: per_resource_timeout) + {id, fun, opts} = definition + + Task.new(id, fun, opts) end) with( @@ -33,7 +35,7 @@ defmodule Util.Loader do new_results = try do runnable - |> Enum.map(fn t -> LoadTask.execute_async(t, Results.fetch(results, t.deps)) end) + |> Enum.map(fn t -> Task.execute_async(t, Results.fetch(results, t.deps)) end) |> Task.await_many() |> Enum.into(%{}) rescue @@ -43,7 +45,6 @@ defmodule Util.Loader do %{} end - case process(new_results) do {:ok, new_results} -> results = Map.merge(results, new_results) @@ -114,55 +115,6 @@ defmodule Util.Loader do Enum.split_with(tasks, fn t -> subset?(t.deps, ids) end) end - defmodule LoadTask do - def new({id, fun}, opts) do - new({id, fun, []}, opts) - end - - def new({id, fun, task_opts}, opts) do - per_resource_timeout = Keyword.get(opts, :per_resource_timeout, :infinity) - timeout = Keyword.get(task_opts, :timeout, :infinity) - - # localy defined timeout always takes priority over the ones - # defined on the whole load operation - timeout = - if timeout == :infinity do - per_resource_timeout - else - timeout - end - - - %{ - id: id, - fun: fun, - deps: Keyword.get(task_opts, :depends_on, []), - timeout: timeout - } - end - - def execute_async(task, deps) do - Task.async(fn -> - execute(task, deps) - end) - end - - defp execute(task, deps) do - Wormhole.capture(fn -> dispatch_call(task.fun, deps) end, timeout: task.timeout) - |> case do - {:ok, res} -> {task.id, res} - e -> raise "fail_fast" - end - end - - defp dispatch_call(fun, deps) do - case :erlang.fun_info(fun)[:arity] do - 0 -> fun.() - 1 -> fun.(deps) - 2 -> fun.(deps, []) - end - end - end defmodule Results do def new(), do: %{} diff --git a/lib/util/loader/task.ex b/lib/util/loader/task.ex new file mode 100644 index 0000000..cfde992 --- /dev/null +++ b/lib/util/loader/task.ex @@ -0,0 +1,42 @@ +defmodule Util.Loader.Task do + @moduledoc """ + An individual loading task. + """ + + defstruct [ + :id, + :fun, + :deps, + :timeout + ] + + def new(id, fun, opts) do + timeout = Keyword.get(opts, :timeout, :infinity) + deps = Keyword.get(opts, :depends_on, []) + + {:ok, struct(__MODULE__, + id: id, + fun: fun, + deps: deps, + timeout: timeout + )} + end + + def execute(task, deps) do + Wormhole.capture(fn -> dispatch_call(task.fun, deps) end, [timeout: task.timeout]) + |> case do + {:ok, {:ok, res}} -> {:ok, res} + {:ok, {:error, err}} -> {:error, err} + {:ok, other} -> {:error, :unexpected_result_type, other} + {:error, err} -> {:error, err} + end + end + + defp dispatch_call(fun, deps) do + case :erlang.fun_info(fun)[:arity] do + 0 -> fun.() + 1 -> fun.(deps) + 2 -> fun.(deps, []) + end + end +end diff --git a/test/util/loader/task_test.exs b/test/util/loader/task_test.exs new file mode 100644 index 0000000..070e510 --- /dev/null +++ b/test/util/loader/task_test.exs @@ -0,0 +1,52 @@ +defmodule Util.Loader.TaskTest do + use ExUnit.Case, async: true + + test "construction" do + assert {:ok, _} = Util.Loader.Task.new(:a, fn -> {:ok, "A"} end, depends_on: [:b, :d]) + end + + describe "execution" do + test "ok result" do + assert {:ok, task} = Util.Loader.Task.new(:a, fn -> {:ok, "A"} end, []) + assert {:ok, "A"} = Util.Loader.Task.execute(task, %{}) + end + + test "error result" do + assert {:ok, task} = Util.Loader.Task.new(:a, fn -> {:error, "B"} end, []) + assert {:error, "B"} = Util.Loader.Task.execute(task, %{}) + end + + test "exception result" do + fun = fn -> raise "AAA" end + + assert {:ok, task} = Util.Loader.Task.new(:a, fun, []) + assert {:error, {:shutdown, %RuntimeError{message: "AAA"}}} = Util.Loader.Task.execute(task, %{}) + end + + test "non-tuple result" do + assert {:ok, task} = Util.Loader.Task.new(:a, fn -> "A" end, []) + assert {:error, :unexpected_result_type, "A"} = Util.Loader.Task.execute(task, %{}) + end + + test "timeout result" do + fun = fn -> + :timer.sleep(1000) + {:ok, "A"} + end + + assert {:ok, task} = Util.Loader.Task.new(:a, fun, timeout: 100) + assert {:error, {:timeout, 100}} = Util.Loader.Task.execute(task, []) + end + + test "with deps" do + deps = %{a: "A", b: "B"} + + fun = fn deps -> + {:ok, deps.a <> deps.b} + end + + assert {:ok, task} = Util.Loader.Task.new(:a, fun, timeout: 100) + assert {:ok, "AB"} = Util.Loader.Task.execute(task, deps) + end + end +end