diff --git a/README.md b/README.md index 5f448d4..78098db 100644 --- a/README.md +++ b/README.md @@ -485,6 +485,35 @@ Durable.provide_input(id, "input_name", data) Durable.list_children(parent_id) ``` +## Mix Tasks + +Durable includes mix tasks for managing workflows from the command line. + +```bash +# Show queue status and workflow summary +mix durable.status + +# List workflow executions (with filters) +mix durable.list # all executions +mix durable.list --status running # filter by status +mix durable.list --workflow MyApp.OrderWorkflow # filter by workflow +mix durable.list --limit 20 --format json # limit results, JSON output + +# Start a workflow +mix durable.run MyApp.OrderWorkflow # no input +mix durable.run MyApp.OrderWorkflow --input '{"id": 123}' # with JSON input +mix durable.run MyApp.OrderWorkflow --queue high_priority # specific queue + +# Cancel a workflow +mix durable.cancel +mix durable.cancel --reason "no longer needed" + +# Clean up old executions +mix durable.cleanup --older-than 30d # completed/failed older than 30 days +mix durable.cleanup --older-than 7d --status completed # only completed, older than 7 days +mix durable.cleanup --older-than 24h --dry-run # preview what would be deleted +``` + ## Guides - [Branching](guides/branching.md) - Conditional flow control diff --git a/lib/mix/helpers.ex b/lib/mix/helpers.ex new file mode 100644 index 0000000..0244104 --- /dev/null +++ b/lib/mix/helpers.ex @@ -0,0 +1,120 @@ +defmodule Durable.Mix.Helpers do + @moduledoc false + + # Shared utilities for Durable mix tasks. + + @doc """ + Ensures the application is started. + """ + def ensure_started do + Mix.Task.run("app.start") + end + + @doc """ + Parses --name option, returns Durable instance name atom. + """ + def get_durable_name(opts) do + case Keyword.get(opts, :name) do + nil -> Durable + name -> Module.concat([name]) + end + end + + @doc """ + Formats rows into an aligned table with headers. + """ + def format_table(rows, headers) do + all = [headers | rows] + + widths = + Enum.reduce(all, List.duplicate(0, length(headers)), fn row, widths -> + row + |> Enum.map(&String.length(to_string(&1))) + |> Enum.zip(widths) + |> Enum.map(fn {a, b} -> max(a, b) end) + end) + + format_row = fn row -> + row + |> Enum.zip(widths) + |> Enum.map_join(" ", fn {val, width} -> + String.pad_trailing(to_string(val), width) + end) + end + + header_line = format_row.(headers) + data_lines = Enum.map(rows, format_row) + [header_line | data_lines] + end + + @doc """ + Truncates a UUID to the first 8 characters. + """ + def truncate_id(nil), do: "—" + + def truncate_id(id) when is_binary(id) do + String.slice(id, 0, 8) + end + + @doc """ + Formats a duration between two datetimes as a human-readable string. + """ + def format_duration(nil, _), do: "—" + def format_duration(_, nil), do: "—" + + def format_duration(started_at, completed_at) do + diff = DateTime.diff(completed_at, started_at, :second) + format_seconds(diff) + end + + @doc """ + Formats a number of seconds into a human-readable duration string. + """ + def format_seconds(seconds) when seconds < 60, do: "#{seconds}s" + + def format_seconds(seconds) when seconds < 3600 do + m = div(seconds, 60) + s = rem(seconds, 60) + if s == 0, do: "#{m}m", else: "#{m}m #{s}s" + end + + def format_seconds(seconds) do + h = div(seconds, 3600) + m = div(rem(seconds, 3600), 60) + if m == 0, do: "#{h}h", else: "#{h}h #{m}m" + end + + @doc """ + Formats a DateTime as "YYYY-MM-DD HH:MM:SS" or "—" for nil. + """ + def format_datetime(nil), do: "—" + + def format_datetime(%DateTime{} = dt) do + Calendar.strftime(dt, "%Y-%m-%d %H:%M:%S") + end + + @doc """ + Formats an integer with comma separators. + """ + def format_number(n) when is_integer(n) and n < 0 do + "-" <> format_number(-n) + end + + def format_number(n) when is_integer(n) do + n + |> Integer.to_string() + |> String.graphemes() + |> Enum.reverse() + |> Enum.chunk_every(3) + |> Enum.map_join(",", &Enum.reverse/1) + |> String.reverse() + |> String.reverse() + end + + @doc """ + Strips the "Elixir." prefix from a module name string. + """ + def strip_elixir_prefix(module_str) when is_binary(module_str) do + String.replace_prefix(module_str, "Elixir.", "") + end +end diff --git a/lib/mix/tasks/durable.cancel.ex b/lib/mix/tasks/durable.cancel.ex new file mode 100644 index 0000000..e873afd --- /dev/null +++ b/lib/mix/tasks/durable.cancel.ex @@ -0,0 +1,60 @@ +defmodule Mix.Tasks.Durable.Cancel do + @shortdoc "Cancels a workflow execution" + + @moduledoc """ + Cancels a running, pending, or waiting workflow execution. + + ## Usage + + mix durable.cancel WORKFLOW_ID [options] + + ## Options + + * `--reason REASON` - Cancellation reason + * `--name NAME` - The Durable instance name (default: Durable) + + ## Examples + + mix durable.cancel abc12345-... + mix durable.cancel abc12345-... --reason "User requested cancellation" + """ + + use Mix.Task + + alias Durable.Executor + alias Durable.Mix.Helpers + + @impl Mix.Task + def run(args) do + Helpers.ensure_started() + + {opts, positional, _} = + OptionParser.parse(args, strict: [reason: :string, name: :string]) + + case positional do + [workflow_id | _] -> + cancel_workflow(workflow_id, opts) + + [] -> + Mix.shell().error("Usage: mix durable.cancel WORKFLOW_ID [--reason REASON]") + end + end + + defp cancel_workflow(workflow_id, opts) do + durable_name = Helpers.get_durable_name(opts) + reason = Keyword.get(opts, :reason) + + case Executor.cancel_workflow(workflow_id, reason, durable: durable_name) do + :ok -> + Mix.shell().info("Workflow #{Helpers.truncate_id(workflow_id)} cancelled.") + + {:error, :not_found} -> + Mix.shell().error("Workflow #{workflow_id} not found.") + + {:error, :already_completed} -> + Mix.shell().error( + "Workflow #{workflow_id} has already completed and cannot be cancelled." + ) + end + end +end diff --git a/lib/mix/tasks/durable.cleanup.ex b/lib/mix/tasks/durable.cleanup.ex new file mode 100644 index 0000000..a37a598 --- /dev/null +++ b/lib/mix/tasks/durable.cleanup.ex @@ -0,0 +1,165 @@ +defmodule Mix.Tasks.Durable.Cleanup do + @shortdoc "Deletes old workflow executions" + + @moduledoc """ + Deletes old workflow executions from the database. + + Cascade deletes handle associated step executions, pending inputs, and events. + + ## Usage + + mix durable.cleanup --older-than DURATION [options] + + ## Options + + * `--older-than DURATION` - Required. Delete executions older than this duration. + Supports: `30d` (days), `24h` (hours), `60m` (minutes) + * `--status STATUS` - Only delete executions with this status (default: completed, failed). + Can be specified multiple times. + * `--dry-run` - Show how many records would be deleted without deleting + * `--batch-size N` - Number of records to delete per batch (default: 1000) + * `--name NAME` - The Durable instance name (default: Durable) + + ## Examples + + mix durable.cleanup --older-than 30d + mix durable.cleanup --older-than 24h --status completed --dry-run + mix durable.cleanup --older-than 7d --batch-size 500 + """ + + use Mix.Task + + import Ecto.Query + + alias Durable.Config + alias Durable.Mix.Helpers + alias Durable.Repo + alias Durable.Storage.Schemas.WorkflowExecution + + @default_statuses [:completed, :failed] + @default_batch_size 1000 + + @impl Mix.Task + def run(args) do + Helpers.ensure_started() + + {opts, _, _} = + OptionParser.parse(args, + strict: [ + older_than: :string, + status: [:string, :keep], + dry_run: :boolean, + batch_size: :integer, + name: :string + ] + ) + + with {:ok, cutoff} <- parse_older_than(opts), + {:ok, statuses} <- parse_statuses(opts) do + durable_name = Helpers.get_durable_name(opts) + config = Config.get(durable_name) + dry_run = Keyword.get(opts, :dry_run, false) + batch_size = Keyword.get(opts, :batch_size, @default_batch_size) + + if dry_run do + run_dry(config, cutoff, statuses) + else + run_cleanup(config, cutoff, statuses, batch_size) + end + end + end + + defp parse_older_than(opts) do + case Keyword.get(opts, :older_than) do + nil -> + Mix.shell().error("--older-than is required. Example: --older-than 30d") + :error + + duration_str -> + parse_duration(duration_str) + end + end + + defp parse_duration(str) do + case Regex.run(~r/^(\d+)([dhm])$/, str) do + [_, num_str, unit] -> + num = String.to_integer(num_str) + seconds = duration_to_seconds(num, unit) + cutoff = DateTime.add(DateTime.utc_now(), -seconds, :second) + {:ok, cutoff} + + nil -> + Mix.shell().error( + "Invalid duration: #{str}. Use format like 30d (days), 24h (hours), or 60m (minutes)." + ) + + :error + end + end + + defp duration_to_seconds(num, "d"), do: num * 86_400 + defp duration_to_seconds(num, "h"), do: num * 3_600 + defp duration_to_seconds(num, "m"), do: num * 60 + + defp parse_statuses(opts) do + case Keyword.get_values(opts, :status) do + [] -> + {:ok, @default_statuses} + + status_strings -> + statuses = + Enum.map(status_strings, fn s -> + String.to_existing_atom(s) + end) + + {:ok, statuses} + end + rescue + ArgumentError -> + Mix.shell().error("Invalid status provided.") + :error + end + + defp run_dry(config, cutoff, statuses) do + count = count_matching(config, cutoff, statuses) + status_str = Enum.map_join(statuses, ", ", &to_string/1) + + Mix.shell().info( + "Dry run: #{Helpers.format_number(count)} executions would be deleted " <> + "(status: #{status_str}, older than #{Helpers.format_datetime(cutoff)})." + ) + end + + defp run_cleanup(config, cutoff, statuses, batch_size) do + total = do_batch_delete(config, cutoff, statuses, batch_size, 0) + Mix.shell().info("Deleted #{Helpers.format_number(total)} workflow executions.") + end + + defp do_batch_delete(config, cutoff, statuses, batch_size, acc) do + ids_query = + from(w in WorkflowExecution, + where: w.status in ^statuses and w.inserted_at < ^cutoff, + select: w.id, + limit: ^batch_size + ) + + delete_query = from(w in WorkflowExecution, where: w.id in subquery(ids_query)) + {deleted, _} = Repo.delete_all(config, delete_query) + + if deleted > 0 do + do_batch_delete(config, cutoff, statuses, batch_size, acc + deleted) + else + acc + end + end + + defp count_matching(config, cutoff, statuses) do + query = + from(w in WorkflowExecution, + where: w.status in ^statuses and w.inserted_at < ^cutoff, + select: count(w.id) + ) + + Repo.one(config, query) + end +end diff --git a/lib/mix/tasks/durable.list.ex b/lib/mix/tasks/durable.list.ex new file mode 100644 index 0000000..9b15cd4 --- /dev/null +++ b/lib/mix/tasks/durable.list.ex @@ -0,0 +1,167 @@ +defmodule Mix.Tasks.Durable.List do + @shortdoc "Lists workflow executions" + + @moduledoc """ + Lists workflow executions with optional filters. + + ## Usage + + mix durable.list [options] + + ## Options + + * `--status STATUS` - Filter by status (pending, running, completed, failed, etc.) + * `--workflow MODULE` - Filter by workflow module + * `--queue QUEUE` - Filter by queue name + * `--since DATETIME` - Show executions since ISO 8601 datetime + * `--until DATETIME` - Show executions until ISO 8601 datetime + * `--limit N` - Maximum number of results (default: 50) + * `--format FORMAT` - Output format: table or json (default: table) + * `--name NAME` - The Durable instance name (default: Durable) + """ + + use Mix.Task + + alias Durable.Mix.Helpers + alias Durable.Query + + @valid_statuses ~w(pending running completed failed waiting cancelled + compensating compensated compensation_failed) + + @impl Mix.Task + def run(args) do + Helpers.ensure_started() + + {opts, _, _} = + OptionParser.parse(args, + strict: [ + status: :string, + workflow: :string, + queue: :string, + since: :string, + until: :string, + limit: :integer, + format: :string, + name: :string + ] + ) + + durable_name = Helpers.get_durable_name(opts) + + case build_filters(opts, durable_name) do + {:ok, filters} -> + executions = Query.list_executions(filters) + format = Keyword.get(opts, :format, "table") + output(executions, format) + + {:error, message} -> + Mix.shell().error(message) + end + end + + defp build_filters(opts, durable_name) do + filters = [durable: durable_name] + + with {:ok, filters} <- add_status(filters, opts), + {:ok, filters} <- add_workflow(filters, opts), + {:ok, filters} <- add_queue(filters, opts), + {:ok, filters} <- add_since(filters, opts), + {:ok, filters} <- add_until(filters, opts) do + limit = Keyword.get(opts, :limit, 50) + {:ok, Keyword.put(filters, :limit, limit)} + end + end + + defp add_status(filters, opts) do + case Keyword.get(opts, :status) do + nil -> + {:ok, filters} + + status_str -> + if status_str in @valid_statuses do + {:ok, Keyword.put(filters, :status, String.to_existing_atom(status_str))} + else + {:error, + "Invalid status: #{status_str}. Valid statuses: #{Enum.join(@valid_statuses, ", ")}"} + end + end + end + + defp add_workflow(filters, opts) do + case Keyword.get(opts, :workflow) do + nil -> {:ok, filters} + mod_str -> {:ok, Keyword.put(filters, :workflow, Module.concat([mod_str]))} + end + end + + defp add_queue(filters, opts) do + case Keyword.get(opts, :queue) do + nil -> {:ok, filters} + queue -> {:ok, Keyword.put(filters, :queue, queue)} + end + end + + defp add_since(filters, opts) do + parse_datetime(filters, opts, :since, :from) + end + + defp add_until(filters, opts) do + parse_datetime(filters, opts, :until, :to) + end + + defp parse_datetime(filters, opts, opt_key, filter_key) do + case Keyword.get(opts, opt_key) do + nil -> + {:ok, filters} + + dt_str -> + case DateTime.from_iso8601(dt_str) do + {:ok, dt, _offset} -> + {:ok, Keyword.put(filters, filter_key, dt)} + + {:error, _} -> + {:error, "Invalid datetime for --#{opt_key}: #{dt_str}. Use ISO 8601 format."} + end + end + end + + defp output(executions, "json") do + data = + Enum.map(executions, fn exec -> + %{ + id: exec.id, + workflow_module: exec.workflow_module, + workflow_name: exec.workflow_name, + status: exec.status, + queue: exec.queue, + started_at: exec.started_at && DateTime.to_iso8601(exec.started_at), + completed_at: exec.completed_at && DateTime.to_iso8601(exec.completed_at) + } + end) + + Mix.shell().info(Jason.encode!(data, pretty: true)) + end + + defp output(executions, _table) do + if executions == [] do + Mix.shell().info("No workflow executions found.") + else + rows = + Enum.map(executions, fn exec -> + [ + Helpers.truncate_id(exec.id), + Helpers.strip_elixir_prefix(exec.workflow_module), + to_string(exec.status), + exec.queue, + Helpers.format_datetime(exec.started_at), + Helpers.format_duration(exec.started_at, exec.completed_at) + ] + end) + + headers = ["ID", "Workflow", "Status", "Queue", "Started", "Duration"] + + Helpers.format_table(rows, headers) + |> Enum.each(fn line -> Mix.shell().info(line) end) + end + end +end diff --git a/lib/mix/tasks/durable.run.ex b/lib/mix/tasks/durable.run.ex new file mode 100644 index 0000000..ddae75d --- /dev/null +++ b/lib/mix/tasks/durable.run.ex @@ -0,0 +1,132 @@ +defmodule Mix.Tasks.Durable.Run do + @shortdoc "Starts a workflow execution" + + @moduledoc """ + Starts a workflow execution. + + ## Usage + + mix durable.run MODULE [options] + + ## Options + + * `--input JSON` - JSON input for the workflow (must be an object) + * `--workflow NAME` - The workflow name within the module + * `--queue QUEUE` - The queue to run on (default: "default") + * `--priority N` - Priority level (default: 0) + * `--name NAME` - The Durable instance name (default: Durable) + + ## Examples + + mix durable.run MyApp.OrderWorkflow + mix durable.run MyApp.OrderWorkflow --input '{"order_id": 123}' + mix durable.run MyApp.OrderWorkflow --workflow process_order --queue high_priority + """ + + use Mix.Task + + alias Durable.Mix.Helpers + + @impl Mix.Task + def run(args) do + Helpers.ensure_started() + + {opts, positional, _} = + OptionParser.parse(args, + strict: [ + input: :string, + workflow: :string, + queue: :string, + priority: :integer, + name: :string + ] + ) + + case positional do + [module_str | _] -> + start_workflow(module_str, opts) + + [] -> + Mix.shell().error( + "Usage: mix durable.run MODULE [--input JSON] [--workflow NAME] [--queue QUEUE]" + ) + end + end + + defp start_workflow(module_str, opts) do + durable_name = Helpers.get_durable_name(opts) + module = Module.concat([module_str]) + + with :ok <- validate_module(module), + {:ok, input} <- parse_input(opts), + {:ok, start_opts} <- build_opts(opts, durable_name) do + case Durable.Executor.start_workflow(module, input, start_opts) do + {:ok, workflow_id} -> + Mix.shell().info("Workflow started: #{workflow_id}") + + {:error, reason} -> + Mix.shell().error("Failed to start workflow: #{inspect(reason)}") + end + end + end + + defp validate_module(module) do + cond do + not Code.ensure_loaded?(module) -> + Mix.shell().error("Module #{inspect(module)} not found.") + :error + + not function_exported?(module, :__workflows__, 0) -> + Mix.shell().error("Module #{inspect(module)} is not a Durable workflow.") + :error + + true -> + :ok + end + end + + defp parse_input(opts) do + case Keyword.get(opts, :input) do + nil -> + {:ok, %{}} + + json_str -> + case Jason.decode(json_str) do + {:ok, input} when is_map(input) -> + {:ok, input} + + {:ok, _} -> + Mix.shell().error("Input must be a JSON object, not an array or scalar.") + :error + + {:error, %Jason.DecodeError{} = error} -> + Mix.shell().error("Invalid JSON input: #{Exception.message(error)}") + :error + end + end + end + + defp build_opts(opts, durable_name) do + start_opts = [durable: durable_name] + + start_opts = + case Keyword.get(opts, :workflow) do + nil -> start_opts + name -> Keyword.put(start_opts, :workflow, name) + end + + start_opts = + case Keyword.get(opts, :queue) do + nil -> start_opts + queue -> Keyword.put(start_opts, :queue, queue) + end + + start_opts = + case Keyword.get(opts, :priority) do + nil -> start_opts + priority -> Keyword.put(start_opts, :priority, priority) + end + + {:ok, start_opts} + end +end diff --git a/lib/mix/tasks/durable.status.ex b/lib/mix/tasks/durable.status.ex new file mode 100644 index 0000000..dcb8f7f --- /dev/null +++ b/lib/mix/tasks/durable.status.ex @@ -0,0 +1,110 @@ +defmodule Mix.Tasks.Durable.Status do + @shortdoc "Shows Durable queue status and workflow summary" + + @moduledoc """ + Shows Durable queue status and workflow summary. + + ## Usage + + mix durable.status [--name NAME] + + ## Options + + * `--name` - The Durable instance name (default: Durable) + """ + + use Mix.Task + + alias Durable.Mix.Helpers + alias Durable.Query + alias Durable.Queue.Manager + + @statuses [ + :pending, + :running, + :completed, + :failed, + :waiting, + :cancelled, + :compensating, + :compensated, + :compensation_failed + ] + + @impl Mix.Task + def run(args) do + Helpers.ensure_started() + {opts, _, _} = OptionParser.parse(args, strict: [name: :string]) + durable_name = Helpers.get_durable_name(opts) + + print_queue_status(durable_name) + Mix.shell().info("") + print_workflow_summary(durable_name) + end + + defp print_queue_status(durable_name) do + queues = Manager.queues(durable_name) + + rows = + Enum.map(queues, fn queue -> + db_stats = Manager.stats(durable_name, queue) + poller_status = get_poller_status(durable_name, queue) + + [ + queue, + poller_status[:concurrency] || "N/A", + poller_status[:active_jobs] || "N/A", + format_paused(poller_status[:paused]), + db_stats.pending, + db_stats.running + ] + end) + + headers = ["Queue", "Concurrency", "Active", "Paused", "Pending", "Running"] + + Mix.shell().info("Queue Status (#{inspect(durable_name)})") + + Helpers.format_table(rows, headers) + |> Enum.each(fn line -> Mix.shell().info(" #{line}") end) + end + + defp get_poller_status(durable_name, queue) do + status = Manager.status(durable_name, queue) + + %{ + concurrency: status.concurrency, + active_jobs: status.active_jobs, + paused: status.paused + } + rescue + _ -> %{concurrency: nil, active_jobs: nil, paused: nil} + catch + :exit, _ -> %{concurrency: nil, active_jobs: nil, paused: nil} + end + + defp format_paused(nil), do: "N/A" + defp format_paused(true), do: "yes" + defp format_paused(false), do: "no" + + defp print_workflow_summary(durable_name) do + rows = + @statuses + |> Enum.map(fn status -> + count = Query.count_executions(status: status, durable: durable_name) + {status, count} + end) + |> Enum.filter(fn {_, count} -> count > 0 end) + |> Enum.map(fn {status, count} -> + [to_string(status), Helpers.format_number(count)] + end) + + Mix.shell().info("Workflow Summary") + + if rows == [] do + Mix.shell().info(" No workflow executions found.") + else + Helpers.format_table(rows, ["Status", "Count"]) + |> Enum.each(fn line -> Mix.shell().info(" #{line}") end) + end + end +end diff --git a/mix.exs b/mix.exs index a7a0e5e..ab96da9 100644 --- a/mix.exs +++ b/mix.exs @@ -80,6 +80,15 @@ defmodule Durable.MixProject do "guides/orchestration.md", "guides/parallel.md", "guides/waiting.md" + ], + groups_for_modules: [ + "Mix Tasks": [ + Mix.Tasks.Durable.Status, + Mix.Tasks.Durable.List, + Mix.Tasks.Durable.Run, + Mix.Tasks.Durable.Cancel, + Mix.Tasks.Durable.Cleanup + ] ] ] end diff --git a/test/mix/tasks/durable_cancel_test.exs b/test/mix/tasks/durable_cancel_test.exs new file mode 100644 index 0000000..5b82938 --- /dev/null +++ b/test/mix/tasks/durable_cancel_test.exs @@ -0,0 +1,81 @@ +defmodule Mix.Tasks.Durable.CancelTest do + use Durable.DataCase, async: false + + alias Durable.Storage.Schemas.WorkflowExecution + alias Mix.Tasks.Durable.Cancel, as: CancelTask + + setup do + Mix.shell(Mix.Shell.Process) + on_exit(fn -> Mix.shell(Mix.Shell.IO) end) + end + + describe "run/1" do + test "cancels a pending workflow" do + execution = insert_execution(status: :pending) + + CancelTask.run([execution.id]) + + assert_received {:mix_shell, :info, [msg]} + assert msg =~ "cancelled" + + updated = Durable.TestRepo.get!(WorkflowExecution, execution.id) + assert updated.status == :cancelled + end + + test "cancels with a reason" do + execution = insert_execution(status: :running) + + CancelTask.run([execution.id, "--reason", "Testing cancellation"]) + + assert_received {:mix_shell, :info, [msg]} + assert msg =~ "cancelled" + + updated = Durable.TestRepo.get!(WorkflowExecution, execution.id) + assert updated.status == :cancelled + assert updated.error["message"] == "Testing cancellation" + end + + test "shows error for non-existent workflow" do + fake_id = Ecto.UUID.generate() + + CancelTask.run([fake_id]) + + assert_received {:mix_shell, :error, [msg]} + assert msg =~ "not found" + end + + test "shows error for already completed workflow" do + execution = insert_execution(status: :completed) + + CancelTask.run([execution.id]) + + assert_received {:mix_shell, :error, [msg]} + assert msg =~ "already completed" + end + + test "shows usage when no workflow ID provided" do + CancelTask.run([]) + + assert_received {:mix_shell, :error, [msg]} + assert msg =~ "Usage:" + end + end + + # Helpers + + defp insert_execution(opts) do + attrs = %{ + workflow_module: "TestWorkflow", + workflow_name: "test", + status: Keyword.get(opts, :status, :pending), + queue: "default", + priority: 0, + input: %{}, + context: %{} + } + + %WorkflowExecution{} + |> WorkflowExecution.changeset(attrs) + |> Durable.TestRepo.insert!() + end +end diff --git a/test/mix/tasks/durable_cleanup_test.exs b/test/mix/tasks/durable_cleanup_test.exs new file mode 100644 index 0000000..4591e06 --- /dev/null +++ b/test/mix/tasks/durable_cleanup_test.exs @@ -0,0 +1,122 @@ +defmodule Mix.Tasks.Durable.CleanupTest do + use Durable.DataCase, async: false + + alias Durable.Storage.Schemas.WorkflowExecution + alias Mix.Tasks.Durable.Cleanup, as: CleanupTask + + setup do + Mix.shell(Mix.Shell.Process) + on_exit(fn -> Mix.shell(Mix.Shell.IO) end) + end + + describe "run/1" do + test "dry run counts without deleting" do + insert_execution(status: :completed, days_ago: 31) + insert_execution(status: :completed, days_ago: 31) + + CleanupTask.run(["--older-than", "30d", "--dry-run"]) + + assert_received {:mix_shell, :info, [msg]} + assert msg =~ "Dry run" + assert msg =~ "2" + + # Verify nothing was deleted + assert Durable.TestRepo.aggregate(WorkflowExecution, :count) == 2 + end + + test "deletes old completed and failed executions" do + old_completed = insert_execution(status: :completed, days_ago: 31) + old_failed = insert_execution(status: :failed, days_ago: 31) + _recent = insert_execution(status: :completed, days_ago: 1) + _running = insert_execution(status: :running, days_ago: 31) + + CleanupTask.run(["--older-than", "30d"]) + + assert_received {:mix_shell, :info, [msg]} + assert msg =~ "Deleted" + assert msg =~ "2" + + # Verify old completed/failed are gone + assert Durable.TestRepo.get(WorkflowExecution, old_completed.id) == nil + assert Durable.TestRepo.get(WorkflowExecution, old_failed.id) == nil + + # Verify recent and running are preserved + assert Durable.TestRepo.aggregate(WorkflowExecution, :count) == 2 + end + + test "respects --status filter" do + insert_execution(status: :completed, days_ago: 31) + insert_execution(status: :failed, days_ago: 31) + + CleanupTask.run(["--older-than", "30d", "--status", "completed"]) + + assert_received {:mix_shell, :info, [msg]} + assert msg =~ "Deleted" + assert msg =~ "1" + + # Failed should still exist + assert Durable.TestRepo.aggregate(WorkflowExecution, :count) == 1 + end + + test "requires --older-than flag" do + CleanupTask.run([]) + + assert_received {:mix_shell, :error, [msg]} + assert msg =~ "--older-than is required" + end + + test "preserves recent data" do + insert_execution(status: :completed, days_ago: 1) + + CleanupTask.run(["--older-than", "30d"]) + + assert_received {:mix_shell, :info, [msg]} + assert msg =~ "Deleted" + assert msg =~ "0" + + assert Durable.TestRepo.aggregate(WorkflowExecution, :count) == 1 + end + + test "handles hour duration format" do + insert_execution(status: :completed, days_ago: 2) + + CleanupTask.run(["--older-than", "24h"]) + + assert_received {:mix_shell, :info, [msg]} + assert msg =~ "Deleted" + assert msg =~ "1" + end + + test "handles minute duration format" do + insert_execution(status: :completed, days_ago: 1) + + CleanupTask.run(["--older-than", "60m"]) + + assert_received {:mix_shell, :info, [msg]} + assert msg =~ "Deleted" + assert msg =~ "1" + end + end + + # Helpers + + defp insert_execution(opts) do + days_ago = Keyword.get(opts, :days_ago, 0) + inserted_at = DateTime.add(DateTime.utc_now(), -days_ago * 86_400, :second) + + attrs = %{ + workflow_module: "TestWorkflow", + workflow_name: "test", + status: Keyword.get(opts, :status, :pending), + queue: "default", + priority: 0, + input: %{}, + context: %{} + } + + %WorkflowExecution{} + |> WorkflowExecution.changeset(attrs) + |> Ecto.Changeset.force_change(:inserted_at, inserted_at) + |> Durable.TestRepo.insert!() + end +end diff --git a/test/mix/tasks/durable_list_test.exs b/test/mix/tasks/durable_list_test.exs new file mode 100644 index 0000000..a425428 --- /dev/null +++ b/test/mix/tasks/durable_list_test.exs @@ -0,0 +1,137 @@ +defmodule Mix.Tasks.Durable.ListTest do + use Durable.DataCase, async: false + + alias Durable.Storage.Schemas.WorkflowExecution + alias Mix.Tasks.Durable.List, as: ListTask + + setup do + Mix.shell(Mix.Shell.Process) + on_exit(fn -> Mix.shell(Mix.Shell.IO) end) + end + + describe "run/1" do + test "lists executions in table format" do + insert_execution(workflow_name: "order", status: :completed) + insert_execution(workflow_name: "payment", status: :running) + + ListTask.run([]) + + output = collect_all_output() + assert output =~ "TestWorkflow" + assert output =~ "completed" + assert output =~ "running" + end + + test "shows no results message when empty" do + ListTask.run([]) + + assert_received {:mix_shell, :info, [msg]} + assert msg =~ "No workflow executions found." + end + + test "filters by status" do + insert_execution(workflow_name: "done", status: :completed) + insert_execution(workflow_name: "active", status: :running) + + ListTask.run(["--status", "completed"]) + + output = collect_all_output() + assert output =~ "completed" + refute output =~ "running" + end + + test "filters by queue" do + insert_execution(workflow_name: "q1", queue: "default") + insert_execution(workflow_name: "q2", queue: "high_priority") + + ListTask.run(["--queue", "high_priority"]) + + output = collect_all_output() + assert output =~ "high_priority" + end + + test "respects --limit" do + for i <- 1..5 do + insert_execution(workflow_name: "wf_#{i}", status: :completed) + end + + ListTask.run(["--limit", "2"]) + + output = collect_all_output() + # Should have header + 2 data rows + lines = output |> String.split("\n") |> Enum.filter(&(&1 =~ "completed")) + assert length(lines) == 2 + end + + test "outputs JSON format" do + insert_execution(workflow_name: "json_test", status: :completed) + + ListTask.run(["--format", "json"]) + + output = collect_all_output() + decoded = Jason.decode!(output) + assert is_list(decoded) + assert length(decoded) == 1 + assert hd(decoded)["workflow_name"] == "json_test" + end + + test "shows error for invalid status" do + ListTask.run(["--status", "invalid_status"]) + + assert_received {:mix_shell, :error, [msg]} + assert msg =~ "Invalid status" + assert msg =~ "pending" + end + + test "filters by --since" do + old = DateTime.add(DateTime.utc_now(), -86_400, :second) + recent = DateTime.utc_now() + + insert_execution(workflow_name: "old", inserted_at: old) + insert_execution(workflow_name: "recent", inserted_at: recent) + + since = DateTime.to_iso8601(DateTime.add(DateTime.utc_now(), -3_600, :second)) + ListTask.run(["--since", since]) + + output = collect_all_output() + assert output =~ "recent" or output =~ "TestWorkflow" + end + end + + # Helpers + + defp insert_execution(opts) do + attrs = %{ + workflow_module: "TestWorkflow", + workflow_name: Keyword.get(opts, :workflow_name, "test"), + status: Keyword.get(opts, :status, :pending), + queue: Keyword.get(opts, :queue, "default"), + priority: 0, + input: %{}, + context: %{} + } + + attrs = + case Keyword.get(opts, :inserted_at) do + nil -> attrs + dt -> Map.put(attrs, :inserted_at, dt) + end + + %WorkflowExecution{} + |> WorkflowExecution.changeset(attrs) + |> Durable.TestRepo.insert!() + end + + defp collect_all_output do + collect_all_output("") + end + + defp collect_all_output(acc) do + receive do + {:mix_shell, :info, [line]} -> collect_all_output(acc <> "\n" <> line) + {:mix_shell, :error, [line]} -> collect_all_output(acc <> "\n" <> line) + after + 100 -> acc + end + end +end diff --git a/test/mix/tasks/durable_run_test.exs b/test/mix/tasks/durable_run_test.exs new file mode 100644 index 0000000..5d53449 --- /dev/null +++ b/test/mix/tasks/durable_run_test.exs @@ -0,0 +1,73 @@ +defmodule Mix.Tasks.Durable.RunTest do + use Durable.DataCase, async: false + + alias Mix.Tasks.Durable.Run, as: RunTask + + setup do + Mix.shell(Mix.Shell.Process) + on_exit(fn -> Mix.shell(Mix.Shell.IO) end) + end + + describe "run/1" do + test "starts a valid workflow" do + RunTask.run(["Durable.TestWorkflows.SimpleWorkflow"]) + + assert_received {:mix_shell, :info, [msg]} + assert msg =~ "Workflow started:" + end + + test "starts workflow with JSON input" do + RunTask.run([ + "Durable.TestWorkflows.SimpleWorkflow", + "--input", + ~s({"key": "value"}) + ]) + + assert_received {:mix_shell, :info, [msg]} + assert msg =~ "Workflow started:" + end + + test "shows error for non-existent module" do + RunTask.run(["NonExistent.Module"]) + + assert_received {:mix_shell, :error, [msg]} + assert msg =~ "not found" + end + + test "shows error for non-workflow module" do + RunTask.run(["Enum"]) + + assert_received {:mix_shell, :error, [msg]} + assert msg =~ "not a Durable workflow" + end + + test "shows error for invalid JSON input" do + RunTask.run([ + "Durable.TestWorkflows.SimpleWorkflow", + "--input", + "not json" + ]) + + assert_received {:mix_shell, :error, [msg]} + assert msg =~ "Invalid JSON" + end + + test "shows error for JSON array input" do + RunTask.run([ + "Durable.TestWorkflows.SimpleWorkflow", + "--input", + "[1, 2, 3]" + ]) + + assert_received {:mix_shell, :error, [msg]} + assert msg =~ "must be a JSON object" + end + + test "shows usage when no module provided" do + RunTask.run([]) + + assert_received {:mix_shell, :error, [msg]} + assert msg =~ "Usage:" + end + end +end diff --git a/test/mix/tasks/durable_status_test.exs b/test/mix/tasks/durable_status_test.exs new file mode 100644 index 0000000..c9674c7 --- /dev/null +++ b/test/mix/tasks/durable_status_test.exs @@ -0,0 +1,85 @@ +defmodule Mix.Tasks.Durable.StatusTest do + use Durable.DataCase, async: false + + alias Durable.Storage.Schemas.WorkflowExecution + alias Mix.Tasks.Durable.Status, as: StatusTask + + setup do + Mix.shell(Mix.Shell.Process) + on_exit(fn -> Mix.shell(Mix.Shell.IO) end) + end + + describe "run/1" do + test "shows queue status and workflow summary with empty database" do + StatusTask.run([]) + + assert_received {:mix_shell, :info, [queue_header]} + assert queue_header =~ "Queue Status" + + # Should show the default queue + assert_receive {:mix_shell, :info, [queue_line]} + assert queue_line =~ "Queue" or queue_line =~ "default" + + # Should eventually show workflow summary + collect_and_assert_output("Workflow Summary") + end + + test "shows counts for seeded data" do + insert_execution(status: :completed) + insert_execution(status: :completed) + insert_execution(status: :failed) + insert_execution(status: :running) + + StatusTask.run([]) + + output = collect_all_output() + assert output =~ "completed" + assert output =~ "2" + assert output =~ "failed" + assert output =~ "running" + end + + test "shows zero when no executions exist for a status" do + StatusTask.run([]) + + output = collect_all_output() + assert output =~ "Workflow Summary" + assert output =~ "No workflow executions found." + end + end + + # Helpers + + defp insert_execution(opts) do + attrs = %{ + workflow_module: "TestWorkflow", + workflow_name: Keyword.get(opts, :workflow_name, "test"), + status: Keyword.get(opts, :status, :pending), + queue: Keyword.get(opts, :queue, "default"), + priority: 0, + input: %{}, + context: %{} + } + + %WorkflowExecution{} + |> WorkflowExecution.changeset(attrs) + |> Durable.TestRepo.insert!() + end + + defp collect_all_output do + collect_all_output("") + end + + defp collect_all_output(acc) do + receive do + {:mix_shell, :info, [line]} -> collect_all_output(acc <> "\n" <> line) + after + 100 -> acc + end + end + + defp collect_and_assert_output(expected) do + output = collect_all_output() + assert output =~ expected + end +end diff --git a/test/support/test_workflows.ex b/test/support/test_workflows.ex new file mode 100644 index 0000000..6d0b67f --- /dev/null +++ b/test/support/test_workflows.ex @@ -0,0 +1,10 @@ +defmodule Durable.TestWorkflows.SimpleWorkflow do + @moduledoc false + use Durable + + workflow "simple" do + step(:hello, fn _data -> + {:ok, %{message: "hello"}} + end) + end +end