From d76183b4d4b0ce8f1339710ace1c4ebb93409707 Mon Sep 17 00:00:00 2001 From: Yuriy Zdyrko Date: Wed, 4 Mar 2020 14:01:44 +0200 Subject: [PATCH 1/2] upgrade of all dependencies --- .formatter.exs | 4 + README.md | 364 +++++++++--------- config/config.exs | 13 +- lib/genstage_example.ex | 38 -- lib/genstage_example/application.ex | 39 ++ lib/genstage_example/consumer.ex | 12 +- lib/genstage_example/producer.ex | 26 +- lib/genstage_example/runner.ex | 15 + lib/genstage_example/task.ex | 8 +- lib/genstage_example/task_db_interface.ex | 39 +- lib/repo.ex | 3 +- mix.exs | 50 ++- mix.lock | 24 +- .../migrations/20161023200119_add_tasks.exs | 6 +- 14 files changed, 338 insertions(+), 303 deletions(-) create mode 100644 .formatter.exs delete mode 100644 lib/genstage_example.ex create mode 100644 lib/genstage_example/application.ex create mode 100644 lib/genstage_example/runner.ex diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 0000000..d2cda26 --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/README.md b/README.md index e396036..8f16435 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ To start, all we need to do is add `gen_stage` to our deps in `mix.deps`. . . . defp deps do [ - {:gen_stage, "~> 0.7"}, + {:gen_stage, "~> 1.0"}, ] end . . . @@ -58,10 +58,9 @@ Now we can add the code: ```elixir defmodule GenstageExample.Producer do - alias Experimental.GenStage use GenStage - def start_link do + def start_link(_) do GenStage.start_link(__MODULE__, 0, name: __MODULE__) # naming allows us to handle failure end @@ -86,14 +85,11 @@ To begin with, we have our initial declarations: ```elixir . . . defmodule GenstageExample.Producer do - alias Experimental.GenStage use GenStage . . . ``` What this does is a couple simple things. -First, we declare our module, and soon after we alias `Experimental.GenStage`. -This is simply because we will be calling it more than once and makes it more convenient. The `use GenStage` line is much akin to `use GenServer`. This line allows us to import the default behaviour and functions to save us from a large amount of boilerplate. @@ -101,8 +97,8 @@ If we go further, we see the first two primary functions for startup: ```elixir . . . - def start_link do - GenStage.start_link(__MODULE__, :the_state_doesnt_matter) + def start_link(_) do + GenStage.start_link(__MODULE__, :the_state_doesnt_matter, name: __MODULE__) end def init(counter) do @@ -112,8 +108,8 @@ If we go further, we see the first two primary functions for startup: ``` These two functions offer a very simple start. -First, we have our standard `start_link/0` function. -Inside here, we use`GenStage.start_link/` beginning with our argument `__MODULE__`, which will give it the name of our current module. +First, we have our standard `start_link/1` function. +Inside here, we use`GenStage.start_link/3` passing current `__MODULE__` as a first argument. Next, we set a state, which is arbitrary in this case, and can be any value. The `__MODULE__` argument is used for name registration like any other module. The second argument is the arguments, which in this case are meaningless as we do not care about it. @@ -141,10 +137,9 @@ We'll start by showing all the code and then break it down. ```elixir defmodule GenstageExample.Consumer do - alias Experimental.GenStage use GenStage - def start_link do + def start_link(_) do GenStage.start_link(__MODULE__, :state_doesnt_matter) end @@ -165,10 +160,9 @@ To start, let's look at the beginning functions just like last time: ```elixir defmodule GenstageExample.Consumer do - alias Experimental.GenStage use GenStage - def start_link do + def start_link(_) do GenStage.start_link(__MODULE__, :state_doesnt_matter) end @@ -178,8 +172,8 @@ defmodule GenstageExample.Consumer do . . . ``` -To begin, much like in our producer, we set up our `start_link/0` and `init/1` functions. -In `start_link` we simple register the module name like last time, and set a state. +To begin, much like in our producer, we set up our `start_link/1` and `init/1` functions. +In `start_link/1` we simple register the module name like last time, and set a state. The state is arbitrary for the consumer, and can be literally whatever we please, in this case `:state_doesnt_matter`. In `init/1` we simply take the state and set up our expected tuple. @@ -208,13 +202,13 @@ After that, we don't reply because we are a consumer and do not handle anything, ## Wiring It Together To get all of this to work we only have to make one simple change. -Open up `lib/genstage_example.ex` and we can add them as workers and they will automatically start with our application: +Open up `lib/genstage_example/application.ex` and we can add them as workers and they will automatically start with our application: ```elixir . . . children = [ - worker(GenstageExample.Producer, []), - worker(GenstageExample.Consumer, []), + {GenstageExample.Producer, []}, + {GenstageExample.Consumer, []} ] . . . ``` @@ -245,14 +239,14 @@ Now, what if we wanted multiple consumers? Right now, if we examine the `IO.inspect/1` output, we see that every single event is handled by a single PID. This isn't very Elixir-y. We have massive concurrency built-in, we should probably leverage that as much as possible. -Let's make some adjustments so that we can have multiple workers by modifying `lib/genstage_example.ex` +Let's make some adjustments so that we can have multiple workers by modifying `lib/genstage_example/application.ex` ```elixir . . . children = [ - worker(GenstageExample.Producer, []), - worker(GenstageExample.Consumer, [], id: 1), - worker(GenstageExample.Consumer, [], id: 2), + {GenstageExample.Producer, []}, + Supervisor.child_spec({GenstageExample.Consumer, []}, id: 1), + Supervisor.child_spec({GenstageExample.Consumer, []}, id: 2) ] . . . ``` @@ -274,11 +268,11 @@ But we can take this even further: ```elixir . . . children = [ - worker(GenstageExample.Producer, []), + {GenstageExample.Producer, []}, ] consumers = for id <- 1..(System.schedulers_online * 12) do # helper to get the number of cores on machine - worker(GenstageExample.Consumer, [], id: id) + Supervisor.child_spec({GenstageExample.Consumer, []}, id: id) end opts = [strategy: :one_for_one, name: GenstageExample.Supervisor] @@ -357,9 +351,9 @@ To get started let's add it and the Postgresql adapter to `mix.exs`: . . . defp deps do [ - {:gen_stage, "~> 0.7"}, - {:ecto, "~> 2.0"}, - {:postgrex, "~> 0.12.1"}, + {:ecto_sql, "~> 3.0"}, + {:postgrex, "~> 0.15.3"}, + {:gen_stage, "~> 1.0"}, ] end . . . @@ -372,32 +366,20 @@ Fetch the dependencies and compile: $ mix do deps.get, compile ``` -And now we can add a repo for setup in `lib/repo.ex`: - -```elixir -defmodule GenstageExample.Repo do - use Ecto.Repo, - otp_app: :genstage_example -end +```shell +$ mix ecto.gen.repo GenstageExample.Repo ``` +This command will: +- create lib/repo.ex +- update config/config.exs -and with this we can set up our config next in `config/config.exs`: +Add this line to `config/config.exs`: ```elixir -use Mix.Config - config :genstage_example, ecto_repos: [GenstageExample.Repo] - -config :genstage_example, GenstageExample.Repo, - adapter: Ecto.Adapters.Postgres, - database: "genstage_example", - username: "your_username", - password: "your_password", - hostname: "localhost", - port: "5432" ``` -And if we add a supservisor to `lib/genstage_example.ex` we can now start working with the DB: +Add Repo to Supervision tree of Application: ```elixir . . . @@ -405,26 +387,31 @@ And if we add a supservisor to `lib/genstage_example.ex` we can now start workin import Supervisor.Spec, warn: false children = [ - supervisor(GenstageExample.Repo, []), - worker(GenstageExample.Producer, []), + {Repo, []}, + {GenstageExample.Producer, []}, ] end . . . ``` -But we should also make an interface to do that, so let's import our query interface and repo to the producer: +Now we need to create our migration: -```elixir -. . . - import Ecto.Query - import GenstageExample.Repo -. . . +```shell +$ mix ecto.gen.migration setup_tasks ``` -Now we need to create our migration: +Inside a migration (`/priv/_...setup_tasks.exs`), add: +```elixir +. . . + def change do + create table(:tasks) do + add :payload, :binary, null: false + add :status, :string, default: "waiting", null: false -```shell -$ mix ecto.gen.migration setup_tasks status:text payload:binary + timestamps(updated_at: false, inserted_at: false) + end + end +. . . ``` Now that we have a functional database, we can start storing things. @@ -447,8 +434,12 @@ To start, let's create our `Task` module to model our actual tasks to be run: ```elixir defmodule GenstageExample.Task do - def enqueue(status, payload) do - GenstageExample.TaskDBInterface.insert_tasks(status, payload) + def enqueue(list) when is_list(list) do + GenstageExample.TaskDBInterface.insert_tasks(list) + end + + def enqueue(payload) do + GenstageExample.TaskDBInterface.insert_tasks([payload]) end def take(limit) do @@ -462,7 +453,7 @@ We only have 2 functions. Now, the module they are calling doesn't exist yet, it gives us the ideas we need to build a very simple interface. These can be broken down as follows: -1. `enqueue/2` - Enqueue a task to be run +1. `enqueue/1` - Enqueue a task to be run 3. `take/1` - Take a given number of tasks to run from the database Now this gives us the interface we need: we can set things to be run, and grab tasks to be run and we can define the rest of the interface. @@ -471,36 +462,55 @@ Let's create an interface with our database in its own module: ```elixir defmodule GenstageExample.TaskDBInterface do import Ecto.Query + alias GenstageExample.Repo def take_tasks(limit) do {:ok, {count, events}} = - GenstageExample.Repo.transaction fn -> - ids = GenstageExample.Repo.all waiting(limit) - GenstageExample.Repo.update_all by_ids(ids), [set: [status: "running"]], [returning: [:id, :payload]] - end + GenstageExample.Repo.transaction(fn -> + ids = GenstageExample.Repo.all(waiting(limit)) + + GenstageExample.Repo.update_all(by_ids(ids), set: [status: "running"]) + end) + {count, events} end - def insert_tasks(status, payload) do - GenstageExample.Repo.insert_all "tasks", [ - %{status: status, payload: payload} - ] + def insert_tasks(list) do + tasks = + list + |> Enum.map(fn payload = {_m, _f, _a} -> + payload = construct_payload(payload) + %{status: "waiting", payload: payload} + end) + + Repo.insert_all("tasks", tasks) end def update_task_status(id, status) do - GenstageExample.Repo.update_all by_ids([id]), set: [status: status] + Repo.update_all(by_ids([id]), set: [status: status]) end defp by_ids(ids) do - from t in "tasks", where: t.id in ^ids + from(t in "tasks", + where: t.id in ^ids, + select: %{ + id: t.id, + payload: t.payload + } + ) end defp waiting(limit) do - from t in "tasks", + from(t in "tasks", where: t.status == "waiting", limit: ^limit, select: t.id, lock: "FOR UPDATE SKIP LOCKED" + ) + end + + defp construct_payload(mfa = {module, function, args}) do + :erlang.term_to_binary(mfa) end end ``` @@ -521,10 +531,12 @@ Let's look at the code: . . . def take_tasks(limit) do {:ok, {count, events}} = - GenstageExample.Repo.transaction fn -> - ids = GenstageExample.Repo.all waiting(limit) - GenstageExample.Repo.update_all by_ids(ids), [set: [status: "running"]], [returning: [:id, :payload]] - end + GenstageExample.Repo.transaction(fn -> + ids = GenstageExample.Repo.all(waiting(limit)) + + GenstageExample.Repo.update_all(by_ids(ids), set: [status: "running"]) + end) + {count, events} end . . . @@ -540,10 +552,13 @@ Next we have `insert_tasks/2`: ```elixir . . . - def insert_tasks(status, payload) do - GenstageExample.Repo.insert_all "tasks", [ - %{status: status, payload: payload} - ] + def insert_tasks(list) do + Repo.insert_all( + "tasks", + Enum.map(list, fn payload = {_m, _f, _a} -> + %{status: "waiting", payload: construct_payload(payload)} + end) + ) end . . . ``` @@ -556,7 +571,7 @@ Finally, we have `update_task_status/2`, which is also quite simple: ```elixir . . . def update_task_status(id, status) do - GenstageExample.Repo.update_all by_ids([id]), set: [status: status] + Repo.update_all(by_ids([id]), set: [status: status]) end . . . ``` @@ -569,15 +584,22 @@ Our helpers are all called primarily inside of `take_tasks/1`, but also used els ```elixir . . . defp by_ids(ids) do - from t in "tasks", where: t.id in ^ids + from(t in "tasks", + where: t.id in ^ids, + select: %{ + id: t.id, + payload: t.payload + } + ) end defp waiting(limit) do - from t in "tasks", + from(t in "tasks", where: t.status == "waiting", limit: ^limit, select: t.id, lock: "FOR UPDATE SKIP LOCKED" + ) end . . . ``` @@ -595,44 +617,44 @@ Now that we have our DB interface defined as it is used in the primary API, we c ### Producer, Consumer, and Final Configuration #### Final Config -We will need to do a bit of configuration in `lib/genstage_example.ex` to clarify things as well as give us the final functionalities we will need to run jobs. +We will need to do a bit of configuration in `lib/genstage_example/application.ex` to clarify things as well as give us the final functionalities we will need to run jobs. This is what we will end up with: ```elixir . . . def start(_type, _args) do - import Supervisor.Spec, warn: false - # 12 workers / system core - consumers = for id <- (0..System.schedulers_online * 12) do - worker(GenstageExample.Consumer, [], id: id) - end + # 12 workers / system core + consumers = + for id <- 0..(System.schedulers_online() * 12) do + Supervisor.child_spec({Consumer, []}, id: id) + end + producers = [ - worker(Producer, []), - ] + {Producer, []} + ] supervisors = [ - supervisor(GenstageExample.Repo, []), - supervisor(Task.Supervisor, [[name: GenstageExample.TaskSupervisor]]), - ] + {Repo, []}, + {Task.Supervisor, name: GenstageExample.TaskSupervisor} + ] + children = supervisors ++ producers ++ consumers opts = [strategy: :one_for_one, name: GenstageExample.Supervisor] Supervisor.start_link(children, opts) end - def start_later(module, function, args) do - payload = {module, function, args} |> :erlang.term_to_binary - Repo.insert_all("tasks", [ - %{status: "waiting", payload: payload} - ]) - notify_producer + def start_later(list) do + GenstageExample.Task.enqueue(list) + notify_producer() end - def notify_producer do - send(Producer, :data_inserted) + def start_later(module, function, args) do + GenstageExample.Task.enqueue({module, function, args}) + notify_producer() end - defdelegate enqueue(module, function, args), to: Producer + defdelegate notify_producer(), to: Producer . . . ``` @@ -642,19 +664,20 @@ First, `start/2`: ```elixir . . . def start(_type, _args) do - import Supervisor.Spec, warn: false - # 12 workers / system core - consumers = for id <- (0..System.schedulers_online * 12) do - worker(GenstageExample.Consumer, [], id: id) - end + consumers = + for id <- 0..(System.schedulers_online() * 12) do + Supervisor.child_spec({Consumer, []}, id: id) + end + producers = [ - worker(Producer, []), - ] + {Producer, []} + ] supervisors = [ - supervisor(GenstageExample.Repo, []), - supervisor(Task.Supervisor, [[name: GenstageExample.TaskSupervisor]]), - ] + {Repo, []}, + {Task.Supervisor, name: GenstageExample.TaskSupervisor} + ] + children = supervisors ++ producers ++ consumers opts = [strategy: :one_for_one, name: GenstageExample.Supervisor] @@ -674,102 +697,62 @@ Next, we have `start_later/3`: ```elixir . . . + def start_later(list) do + GenstageExample.Task.enqueue(list) + notify_producer() + end + def start_later(module, function, args) do - payload = {module, function, args} |> :erlang.term_to_binary - Repo.insert_all("tasks", [ - %{status: "waiting", payload: payload} - ]) - notify_producer + GenstageExample.Task.enqueue({module, function, args}) + notify_producer() end + + defdelegate notify_producer(), to: Producer . . . ``` -This function takes a module, a function, and an argument. -It then encodes them as a binary using some built-in erlang magic. -From here, we then insert the task as `waiting`, and we notify a producer that a task has been inserted to run. +We then insert the task as `waiting`, and notify a producer that a task has been inserted to run. +`defdelegate notify_producer()` means method in Producer module will be invoked. -Now let's check out `notify_producer/0`: +`notify_producer/0` in /lib/producer.ex: ```elixir . . . def notify_producer do - send(Producer, :data_inserted) + GenStage.cast(@name, :data_inserted) end . . . ``` This method is quite simple. -We send our producer a message, `:data_inserted`, simply so that it knows what we did. +We cast our producer a message, `:data_inserted`, simply so that it knows what we did. The message here is arbitrary, but I chose this atom to make the meaning clear. -Last, but not least we do some simple delegation: - -```elixir -. . . - defdelegate enqueue(module, functions, args), to : Producer -. . . -``` -This simply makes it so if we call `GenstageExample.enqueue(module, function, args)` that it will be delegated to the same method in our producer. - ### Producer Setup Our producer doesn't need a ton of work. first, we'll alter our `handle_demand/2` to actually do something with our events: ```elixir . . . - def handle_demand(demand, state) when demand > 0 do + def handle_demand(demand, state) do serve_jobs(demand + state) end . . . ``` -We haven't defined `serve_jobs/2` yet, but we'll get there. -The concept is simple, when we get a demand and demand is > 0, we do some work to the tune of demand + the current state's number of jobs. - -Now that we will be sending a message to the producer when we run `start_later/3`, we will want to respond to it with a `handle_info/2` call: - -```elixir -. . . - def handle_info(:enqueued, state) do - {count, events} = GenstageExample.Task.take(state) - {:noreply, events, state - count} - end -. . . -``` - With this, we simply respond by taking the number of tasks we are told to get ready to run. Now let's define `serve_jobs/1`: ```elixir . . . - def serve_jobs limit do + def serve_jobs(limit) do {count, events} = GenstageExample.Task.take(limit) - Process.send_after(@name, :enqueued, 60_000) {:noreply, events, limit - count} end . . . ``` -Now, we are sending a process in one minute that to our producer telling it that it should respond to `:enqueued`. -Note that we call the process module with `@name`, which we will need to add at the top as a module attribute: - -```elixir -. . . - @name __MODULE__ -. . . -``` - -Let's define that last function to handle the `:enqueued` message now, too: - -```elixir -. . . - def handle_cast(:enqueued, state) do - serve_jobs(state) - end -. . . -``` - -This will simply serve jobs when we tell the producer they have `state` number of enqueued and to respond. +`serve_jobs/1` will be invoked ## Setting Up the Consumer for Real Work Our consumer is where we do the work. @@ -787,6 +770,7 @@ The core of the consumer is `handle_events/3`, lets flesh out the functionality task = start_task(module, function, args) yield_to_and_update_task(task, id) end + {:noreply, [], state} end . . . @@ -870,11 +854,10 @@ From this, we can see our finalized consumer: ```elixir defmodule GenstageExample.Consumer do - alias Experimental.GenStage use GenStage - alias GenstageExample.{Producer, TaskSupervisor} + alias GenstageExample.{Producer} - def start_link do + def start_link(_) do GenStage.start_link(__MODULE__, :state_doesnt_matter) end @@ -889,18 +872,12 @@ defmodule GenstageExample.Consumer do task = start_task(module, function, args) yield_to_and_update_task(task, id) end - {:noreply, [], state} - end - defp yield_to_and_update_task(task, id) do - task - |> Task.yield(1000) - |> yield_to_status(task) - |> update(id) + {:noreply, [], state} end defp start_task(mod, func, args) do - Task.Supervisor.async_nolink(TaskSupervisor, mod , func, args) + Task.Supervisor.async_nolink(GenstageExample.TaskSupervisor, mod, func, args) end defp yield_to_status({:ok, _}, _) do @@ -920,17 +897,27 @@ defmodule GenstageExample.Consumer do GenstageExample.TaskDBInterface.update_task_status(id, status) end - defp deconstruct_payload payload do - payload |> :erlang.binary_to_term + defp yield_to_and_update_task(task, id) do + task + |> Task.yield(1000) + |> yield_to_status(task) + |> update(id) + end + + defp deconstruct_payload(payload) do + payload |> :erlang.binary_to_term() end end ``` -Now, if we go into IEx: +### How to run it + +#### Manually add tasks: ```elixir $ iex -S mix -iex> GenstageExample.enqueue(IO, :puts, ["wuddup"]) +iex> GenstageExample.start_later(IO, :puts, ["wuddup"]) + #=> 16:39:31.014 [debug] QUERY OK db=137.4ms INSERT INTO "tasks" ("payload","status") VALUES ($1,$2) [<<131, 104, 3, 100, 0, 9, 69, 108, 105, 120, 105, 114, 46, 73, 79, 100, 0, 4, 112, 117, 116, 115, 108, 0, 0, 0, 1, 109, 0, 0, 0, 6, 119, 117, 100, 100, 117, 112, 106>>, "waiting"] @@ -954,3 +941,10 @@ UPDATE "tasks" AS t0 SET "status" = $1 WHERE (t0."id" = ANY($2)) ["success", [5] ``` It works and we are storing and running tasks! + +#### Or just add 2000 tasks automatically: + +```elixir +$ iex -S mix +iex> GenstageExample.Runner.run() +``` \ No newline at end of file diff --git a/config/config.exs b/config/config.exs index c85213a..ed67687 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,12 +1,9 @@ -use Mix.Config +import Config config :genstage_example, ecto_repos: [GenstageExample.Repo] config :genstage_example, GenstageExample.Repo, - adapter: Ecto.Adapters.Postgres, - database: "genstage_example", - username: "bobdawg", - password: "", - hostname: "localhost", - port: "5432" - + database: "genstage_example_repo", + username: "postgres", + password: "postgres", + hostname: "localhost" diff --git a/lib/genstage_example.ex b/lib/genstage_example.ex deleted file mode 100644 index 4e10af5..0000000 --- a/lib/genstage_example.ex +++ /dev/null @@ -1,38 +0,0 @@ -defmodule GenstageExample do - use Application - - alias GenstageExample.{Producer, Repo} - def start(_type, _args) do - import Supervisor.Spec, warn: false - # 12 workers / system core - consumers = for id <- (0..System.schedulers_online * 12) do - worker(GenstageExample.Consumer, [], id: id) - end - producers = [ - worker(Producer, []), - ] - - supervisors = [ - supervisor(GenstageExample.Repo, []), - supervisor(Task.Supervisor, [[name: GenstageExample.TaskSupervisor]]), - ] - children = supervisors ++ producers ++ consumers - - opts = [strategy: :one_for_one, name: GenstageExample.Supervisor] - Supervisor.start_link(children, opts) - end - - def start_later(module, function, args) do - payload = {module, function, args} |> :erlang.term_to_binary - Repo.insert_all("tasks", [ - %{status: "waiting", payload: payload} - ]) - notify_producer - end - - def notify_producer do - send(Producer, :data_inserted) - end - - defdelegate enqueue(module, function, args), to: Producer -end diff --git a/lib/genstage_example/application.ex b/lib/genstage_example/application.ex new file mode 100644 index 0000000..b7de823 --- /dev/null +++ b/lib/genstage_example/application.ex @@ -0,0 +1,39 @@ +defmodule GenstageExample do + use Application + + alias GenstageExample.{Producer, Consumer, Repo} + + def start(_type, _args) do + # 12 workers / system core + consumers = + for id <- 0..(System.schedulers_online() * 12) do + Supervisor.child_spec({Consumer, []}, id: id) + end + + producers = [ + {Producer, []} + ] + + supervisors = [ + {Repo, []}, + {Task.Supervisor, name: GenstageExample.TaskSupervisor} + ] + + children = supervisors ++ producers ++ consumers + + opts = [strategy: :one_for_one, name: GenstageExample.Supervisor] + Supervisor.start_link(children, opts) + end + + def start_later(list) do + GenstageExample.Task.enqueue(list) + notify_producer() + end + + def start_later(module, function, args) do + GenstageExample.Task.enqueue({module, function, args}) + notify_producer() + end + + defdelegate notify_producer(), to: Producer +end diff --git a/lib/genstage_example/consumer.ex b/lib/genstage_example/consumer.ex index f03e3ec..22fc6f5 100644 --- a/lib/genstage_example/consumer.ex +++ b/lib/genstage_example/consumer.ex @@ -1,9 +1,8 @@ defmodule GenstageExample.Consumer do - alias Experimental.GenStage use GenStage - alias GenstageExample.{Producer, TaskSupervisor} + alias GenstageExample.{Producer} - def start_link do + def start_link(_) do GenStage.start_link(__MODULE__, :state_doesnt_matter) end @@ -18,11 +17,12 @@ defmodule GenstageExample.Consumer do task = start_task(module, function, args) yield_to_and_update_task(task, id) end + {:noreply, [], state} end defp start_task(mod, func, args) do - Task.Supervisor.async_nolink(TaskSupervisor, mod , func, args) + Task.Supervisor.async_nolink(GenstageExample.TaskSupervisor, mod, func, args) end defp yield_to_status({:ok, _}, _) do @@ -49,7 +49,7 @@ defmodule GenstageExample.Consumer do |> update(id) end - defp deconstruct_payload payload do - payload |> :erlang.binary_to_term + defp deconstruct_payload(payload) do + payload |> :erlang.binary_to_term() end end diff --git a/lib/genstage_example/producer.ex b/lib/genstage_example/producer.ex index d4f9f1b..d94878c 100644 --- a/lib/genstage_example/producer.ex +++ b/lib/genstage_example/producer.ex @@ -1,25 +1,21 @@ defmodule GenstageExample.Producer do - alias Experimental.GenStage use GenStage @name __MODULE__ - def start_link do - GenStage.start_link(__MODULE__, 0, name: @name) + def start_link(_) do + GenStage.start_link(@name, 0, name: @name) end def init(counter) do {:producer, counter} end - def enqueue(module, function, args) do - payload = {module, function, args} |> construct_payload - GenstageExample.Task.enqueue("waiting", payload) - Process.send(@name, :enqueued, []) - :ok + def notify_producer do + GenStage.cast(@name, :data_inserted) end - def handle_cast(:enqueued, state) do + def handle_cast(:data_inserted, state) do serve_jobs(state) end @@ -27,18 +23,8 @@ defmodule GenstageExample.Producer do serve_jobs(demand + state) end - def handle_info(:enqueued, state) do - {count, events} = GenstageExample.Task.take(state) - {:noreply, events, state - count} - end - - def serve_jobs limit do + def serve_jobs(limit) do {count, events} = GenstageExample.Task.take(limit) - Process.send_after(@name, :enqueued, 60_000) {:noreply, events, limit - count} end - - defp construct_payload({module, function, args}) do - {module, function, args} |> :erlang.term_to_binary - end end diff --git a/lib/genstage_example/runner.ex b/lib/genstage_example/runner.ex new file mode 100644 index 0000000..679a105 --- /dev/null +++ b/lib/genstage_example/runner.ex @@ -0,0 +1,15 @@ +defmodule GenstageExample.Runner do + alias GenstageExample, as: App + + def run do + 0..2000 + |> Enum.map(fn i -> + {__MODULE__, :do_work, [i]} + end) + |> App.start_later() + end + + def do_work(i) do + IO.puts("Running :do_work(#{inspect(i)}) in PID #{inspect(self())}") + end +end diff --git a/lib/genstage_example/task.ex b/lib/genstage_example/task.ex index 8d9fbc1..3dd4e9e 100644 --- a/lib/genstage_example/task.ex +++ b/lib/genstage_example/task.ex @@ -1,6 +1,10 @@ defmodule GenstageExample.Task do - def enqueue(status, payload) do - GenstageExample.TaskDBInterface.insert_tasks(status, payload) + def enqueue(list) when is_list(list) do + GenstageExample.TaskDBInterface.insert_tasks(list) + end + + def enqueue(payload) do + GenstageExample.TaskDBInterface.insert_tasks([payload]) end def take(limit) do diff --git a/lib/genstage_example/task_db_interface.ex b/lib/genstage_example/task_db_interface.ex index 479a22f..d7f9382 100644 --- a/lib/genstage_example/task_db_interface.ex +++ b/lib/genstage_example/task_db_interface.ex @@ -1,34 +1,51 @@ defmodule GenstageExample.TaskDBInterface do import Ecto.Query + alias GenstageExample.Repo def take_tasks(limit) do {:ok, {count, events}} = - GenstageExample.Repo.transaction fn -> - ids = GenstageExample.Repo.all waiting(limit) - GenstageExample.Repo.update_all by_ids(ids), [set: [status: "running"]], [returning: [:id, :payload]] - end + GenstageExample.Repo.transaction(fn -> + ids = GenstageExample.Repo.all(waiting(limit)) + + GenstageExample.Repo.update_all(by_ids(ids), set: [status: "running"]) + end) + {count, events} end - def insert_tasks(status, payload) do - GenstageExample.Repo.insert_all "tasks", [ - %{status: status, payload: payload} - ] + def insert_tasks(list) do + Repo.insert_all( + "tasks", + Enum.map(list, fn payload = {_m, _f, _a} -> + %{status: "waiting", payload: construct_payload(payload)} + end) + ) end def update_task_status(id, status) do - GenstageExample.Repo.update_all by_ids([id]), set: [status: status] + Repo.update_all(by_ids([id]), set: [status: status]) end defp by_ids(ids) do - from t in "tasks", where: t.id in ^ids + from(t in "tasks", + where: t.id in ^ids, + select: %{ + id: t.id, + payload: t.payload + } + ) end defp waiting(limit) do - from t in "tasks", + from(t in "tasks", where: t.status == "waiting", limit: ^limit, select: t.id, lock: "FOR UPDATE SKIP LOCKED" + ) + end + + defp construct_payload(mfa = {module, function, args}) do + :erlang.term_to_binary(mfa) end end diff --git a/lib/repo.ex b/lib/repo.ex index ba7d26a..3a69a33 100644 --- a/lib/repo.ex +++ b/lib/repo.ex @@ -1,4 +1,5 @@ defmodule GenstageExample.Repo do use Ecto.Repo, - otp_app: :genstage_example + otp_app: :genstage_example, + adapter: Ecto.Adapters.Postgres end diff --git a/mix.exs b/mix.exs index 59215b7..c271bd6 100644 --- a/mix.exs +++ b/mix.exs @@ -2,21 +2,25 @@ defmodule GenstageExample.Mixfile do use Mix.Project def project do - [app: :genstage_example, - version: "0.1.0", - elixir: "~> 1.3", - build_embedded: Mix.env == :prod, - start_permanent: Mix.env == :prod, - package: package(), - deps: deps()] + [ + app: :genstage_example, + version: "0.1.0", + elixir: "~> 1.3", + build_embedded: Mix.env() == :prod, + start_permanent: Mix.env() == :prod, + package: package(), + deps: deps() + ] end # Configuration for the OTP application # # Type "mix help compile.app" for more information def application do - [applications: [:logger, :postgrex], - mod: {GenstageExample, []}] + [ + extra_applications: [:logger], + mod: {GenstageExample, []} + ] end # Dependencies can be Hex packages: @@ -30,21 +34,25 @@ defmodule GenstageExample.Mixfile do # Type "mix help deps" for more examples and options defp deps do [ - {:ecto, "~> 2.0"}, - {:postgrex, "~> 0.12.1"}, - {:gen_stage, "~> 0.7"}, - {:ex_doc, ">= 0.0.0", only: :dev}, + {:ecto_sql, "~> 3.0"}, + {:postgrex, "~> 0.15.3"}, + {:gen_stage, "~> 1.0"}, + {:ex_doc, ">= 0.0.0", only: :dev} ] end defp package do - [# These are the default files included in the package - name: :postgrex, - files: ["doc", "lib", "priv", "mix.exs", "README*", "README*"], - maintainers: ["Robert Grayson"], - description: "A simple genstage example - a task runner", - licenses: ["DWTFYWPL"], - links: %{"GitHub" => "https://github.com/ybur-yug/genstage_example", - "Docs" => "http://elixirschool.com/"}] + # These are the default files included in the package + [ + name: :postgrex, + files: ["doc", "lib", "priv", "mix.exs", "README*", "README*"], + maintainers: ["Robert Grayson"], + description: "A simple genstage example - a task runner", + licenses: ["DWTFYWPL"], + links: %{ + "GitHub" => "https://github.com/ybur-yug/genstage_example", + "Docs" => "http://elixirschool.com/" + } + ] end end diff --git a/mix.lock b/mix.lock index 5bcc840..725b94c 100644 --- a/mix.lock +++ b/mix.lock @@ -1,9 +1,15 @@ -%{"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []}, - "db_connection": {:hex, :db_connection, "1.0.0", "63c03e520d54886a66104d34e32397ba960db6e74b596ce221592c07d6a40d8d", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, optional: false]}, {:poolboy, "~> 1.5", [hex: :poolboy, optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, optional: true]}]}, - "decimal": {:hex, :decimal, "1.2.0", "462960fd71af282e570f7b477f6be56bf8968e68277d4d0b641a635269bf4b0d", [:mix], []}, - "earmark": {:hex, :earmark, "1.0.2", "a0b0904d74ecc14da8bd2e6e0248e1a409a2bc91aade75fcf428125603de3853", [:mix], []}, - "ecto": {:hex, :ecto, "2.0.5", "7f4c79ac41ffba1a4c032b69d7045489f0069c256de606523c65d9f8188e502d", [:mix], [{:db_connection, "~> 1.0-rc.4", [hex: :db_connection, optional: true]}, {:decimal, "~> 1.1.2 or ~> 1.2", [hex: :decimal, optional: false]}, {:mariaex, "~> 0.7.7", [hex: :mariaex, optional: true]}, {:poison, "~> 1.5 or ~> 2.0", [hex: :poison, optional: true]}, {:poolboy, "~> 1.5", [hex: :poolboy, optional: false]}, {:postgrex, "~> 0.12.0", [hex: :postgrex, optional: true]}, {:sbroker, "~> 1.0-beta", [hex: :sbroker, optional: true]}]}, - "ex_doc": {:hex, :ex_doc, "0.14.3", "e61cec6cf9731d7d23d254266ab06ac1decbb7651c3d1568402ec535d387b6f7", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]}, - "gen_stage": {:hex, :gen_stage, "0.7.0", "d8ab7f294ca0fb7ca001c33a97d25876561f086ecd149689f43c0d2ffccf4fff", [:mix], []}, - "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], []}, - "postgrex": {:hex, :postgrex, "0.12.1", "2f8b46cb3a44dcd42f42938abedbfffe7e103ba4ce810ccbeee8dcf27ca0fb06", [:mix], [{:connection, "~> 1.0", [hex: :connection, optional: false]}, {:db_connection, "~> 1.0-rc.4", [hex: :db_connection, optional: false]}, {:decimal, "~> 1.0", [hex: :decimal, optional: false]}]}} +%{ + "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm", "4a0850c9be22a43af9920a71ab17c051f5f7d45c209e40269a1938832510e4d9"}, + "db_connection": {:hex, :db_connection, "2.2.1", "caee17725495f5129cb7faebde001dc4406796f12a62b8949f4ac69315080566", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "2b02ece62d9f983fcd40954e443b7d9e6589664380e5546b2b9b523cd0fb59e1"}, + "decimal": {:hex, :decimal, "1.8.1", "a4ef3f5f3428bdbc0d35374029ffcf4ede8533536fa79896dd450168d9acdf3c", [:mix], [], "hexpm", "3cb154b00225ac687f6cbd4acc4b7960027c757a5152b369923ead9ddbca7aec"}, + "earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"}, + "ecto": {:hex, :ecto, "3.3.4", "95b05c82ae91361475e5491c9f3ac47632f940b3f92ae3988ac1aad04989c5bb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "9b96cbb83a94713731461ea48521b178b0e3863d310a39a3948c807266eebd69"}, + "ecto_sql": {:hex, :ecto_sql, "3.3.4", "aa18af12eb875fbcda2f75e608b3bd534ebf020fc4f6448e4672fcdcbb081244", [:mix], [{:db_connection, "~> 2.2", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.4 or ~> 3.3.3", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.3.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.15.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "5eccbdbf92e3c6f213007a82d5dbba4cd9bb659d1a21331f89f408e4c0efd7a8"}, + "ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"}, + "gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"}, + "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"}, + "postgrex": {:hex, :postgrex, "0.15.3", "5806baa8a19a68c4d07c7a624ccdb9b57e89cbc573f1b98099e3741214746ae4", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "4737ce62a31747b4c63c12b20c62307e51bb4fcd730ca0c32c280991e0606c90"}, + "telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"}, +} diff --git a/priv/repo/migrations/20161023200119_add_tasks.exs b/priv/repo/migrations/20161023200119_add_tasks.exs index 0854120..9d7aa27 100644 --- a/priv/repo/migrations/20161023200119_add_tasks.exs +++ b/priv/repo/migrations/20161023200119_add_tasks.exs @@ -3,8 +3,10 @@ defmodule GenstageExample.Repo.Migrations.AddTasks do def change do create table(:tasks) do - add :payload, :binary - add :status, :string + add :payload, :binary, null: false + add :status, :string, default: "waiting", null: false + + timestamps(updated_at: false, inserted_at: false) end end end From 6241135478ccc1fc4b23d1d64d0e9c31ebb65c91 Mon Sep 17 00:00:00 2001 From: Yuriy Zdyrko Date: Wed, 4 Mar 2020 15:11:54 +0200 Subject: [PATCH 2/2] upgrade - small fixes --- README.md | 42 +++++++++++------------------ lib/genstage_example.ex | 23 ++++++++++++++++ lib/genstage_example/application.ex | 14 +--------- lib/genstage_example/runner.ex | 15 ----------- mix.exs | 2 +- 5 files changed, 41 insertions(+), 55 deletions(-) create mode 100644 lib/genstage_example.ex delete mode 100644 lib/genstage_example/runner.ex diff --git a/README.md b/README.md index 8f16435..9d236bd 100644 --- a/README.md +++ b/README.md @@ -643,18 +643,6 @@ This is what we will end up with: opts = [strategy: :one_for_one, name: GenstageExample.Supervisor] Supervisor.start_link(children, opts) end - - def start_later(list) do - GenstageExample.Task.enqueue(list) - notify_producer() - end - - def start_later(module, function, args) do - GenstageExample.Task.enqueue({module, function, args}) - notify_producer() - end - - defdelegate notify_producer(), to: Producer . . . ``` @@ -693,39 +681,41 @@ This new supervisor is run through `Task.Supervisor`, which is built into Elixir We give it a name so it is easily referred to in our GenStage code, `GenstageExample.TaskSupervisor`. Now, we define our children as the concatenation of all these lists. -Next, we have `start_later/3`: + +`/lib/genserver_example.ex` will be used as a public interface to the application. +It inserts tasks into database, and notifies Producer of change. ```elixir -. . . +defmodule GenstageExample do def start_later(list) do GenstageExample.Task.enqueue(list) - notify_producer() + GenstageExample.Producer.notify_producer() end def start_later(module, function, args) do GenstageExample.Task.enqueue({module, function, args}) - notify_producer() + GenstageExample.Producer.notify_producer() end - - defdelegate notify_producer(), to: Producer . . . ``` -We then insert the task as `waiting`, and notify a producer that a task has been inserted to run. -`defdelegate notify_producer()` means method in Producer module will be invoked. -`notify_producer/0` in /lib/producer.ex: +`notify_producer/0` in `/lib/genstage_example/producer.ex` +This method is quite simple. +We cast our producer a message, `:data_inserted`, simply so that it knows what we did. +The message here is arbitrary, but I chose this atom to make the meaning clear. ```elixir . . . def notify_producer do GenStage.cast(@name, :data_inserted) end + + def handle_cast(:data_inserted, state) do + serve_jobs(state) + end . . . ``` -This method is quite simple. -We cast our producer a message, `:data_inserted`, simply so that it knows what we did. -The message here is arbitrary, but I chose this atom to make the meaning clear. ### Producer Setup Our producer doesn't need a ton of work. @@ -910,7 +900,7 @@ defmodule GenstageExample.Consumer do end ``` -### How to run it +## How to run it #### Manually add tasks: @@ -946,5 +936,5 @@ It works and we are storing and running tasks! ```elixir $ iex -S mix -iex> GenstageExample.Runner.run() +iex> GenstageExample.auto_start_later() ``` \ No newline at end of file diff --git a/lib/genstage_example.ex b/lib/genstage_example.ex new file mode 100644 index 0000000..e464d34 --- /dev/null +++ b/lib/genstage_example.ex @@ -0,0 +1,23 @@ +defmodule GenstageExample do + def start_later(list) do + GenstageExample.Task.enqueue(list) + GenstageExample.Producer.notify_producer() + end + + def start_later(module, function, args) do + GenstageExample.Task.enqueue({module, function, args}) + GenstageExample.Producer.notify_producer() + end + + def auto_start_later do + 0..2000 + |> Enum.map(fn i -> + {__MODULE__, :do_work, [i]} + end) + |> start_later() + end + + def do_work(i) do + IO.puts("Running :do_work(#{inspect(i)}) in PID #{inspect(self())}") + end +end diff --git a/lib/genstage_example/application.ex b/lib/genstage_example/application.ex index b7de823..dbca263 100644 --- a/lib/genstage_example/application.ex +++ b/lib/genstage_example/application.ex @@ -1,4 +1,4 @@ -defmodule GenstageExample do +defmodule GenstageExample.Application do use Application alias GenstageExample.{Producer, Consumer, Repo} @@ -24,16 +24,4 @@ defmodule GenstageExample do opts = [strategy: :one_for_one, name: GenstageExample.Supervisor] Supervisor.start_link(children, opts) end - - def start_later(list) do - GenstageExample.Task.enqueue(list) - notify_producer() - end - - def start_later(module, function, args) do - GenstageExample.Task.enqueue({module, function, args}) - notify_producer() - end - - defdelegate notify_producer(), to: Producer end diff --git a/lib/genstage_example/runner.ex b/lib/genstage_example/runner.ex deleted file mode 100644 index 679a105..0000000 --- a/lib/genstage_example/runner.ex +++ /dev/null @@ -1,15 +0,0 @@ -defmodule GenstageExample.Runner do - alias GenstageExample, as: App - - def run do - 0..2000 - |> Enum.map(fn i -> - {__MODULE__, :do_work, [i]} - end) - |> App.start_later() - end - - def do_work(i) do - IO.puts("Running :do_work(#{inspect(i)}) in PID #{inspect(self())}") - end -end diff --git a/mix.exs b/mix.exs index c271bd6..0ee6159 100644 --- a/mix.exs +++ b/mix.exs @@ -19,7 +19,7 @@ defmodule GenstageExample.Mixfile do def application do [ extra_applications: [:logger], - mod: {GenstageExample, []} + mod: {GenstageExample.Application, []} ] end