Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,35 @@ Durable.provide_input(id, "input_name", data)
Durable.list_children(parent_id)
```

## Mix Tasks

Durable includes mix tasks for managing workflows from the command line.

```bash
# Show queue status and workflow summary
mix durable.status

# List workflow executions (with filters)
mix durable.list # all executions
mix durable.list --status running # filter by status
mix durable.list --workflow MyApp.OrderWorkflow # filter by workflow
mix durable.list --limit 20 --format json # limit results, JSON output

# Start a workflow
mix durable.run MyApp.OrderWorkflow # no input
mix durable.run MyApp.OrderWorkflow --input '{"id": 123}' # with JSON input
mix durable.run MyApp.OrderWorkflow --queue high_priority # specific queue

# Cancel a workflow
mix durable.cancel <execution_id>
mix durable.cancel <execution_id> --reason "no longer needed"

# Clean up old executions
mix durable.cleanup --older-than 30d # completed/failed older than 30 days
mix durable.cleanup --older-than 7d --status completed # only completed, older than 7 days
mix durable.cleanup --older-than 24h --dry-run # preview what would be deleted
```

## Guides

- [Branching](guides/branching.md) - Conditional flow control
Expand Down
120 changes: 120 additions & 0 deletions lib/mix/helpers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
defmodule Durable.Mix.Helpers do
@moduledoc false

# Shared utilities for Durable mix tasks.

@doc """
Ensures the application is started.
"""
def ensure_started do
Mix.Task.run("app.start")
end

@doc """
Parses --name option, returns Durable instance name atom.
"""
def get_durable_name(opts) do
case Keyword.get(opts, :name) do
nil -> Durable
name -> Module.concat([name])
end
end

@doc """
Formats rows into an aligned table with headers.
"""
def format_table(rows, headers) do
all = [headers | rows]

widths =
Enum.reduce(all, List.duplicate(0, length(headers)), fn row, widths ->
row
|> Enum.map(&String.length(to_string(&1)))
|> Enum.zip(widths)
|> Enum.map(fn {a, b} -> max(a, b) end)
end)

format_row = fn row ->
row
|> Enum.zip(widths)
|> Enum.map_join(" ", fn {val, width} ->
String.pad_trailing(to_string(val), width)
end)
end

header_line = format_row.(headers)
data_lines = Enum.map(rows, format_row)
[header_line | data_lines]
end

@doc """
Truncates a UUID to the first 8 characters.
"""
def truncate_id(nil), do: "—"

def truncate_id(id) when is_binary(id) do
String.slice(id, 0, 8)
end

@doc """
Formats a duration between two datetimes as a human-readable string.
"""
def format_duration(nil, _), do: "—"
def format_duration(_, nil), do: "—"

def format_duration(started_at, completed_at) do
diff = DateTime.diff(completed_at, started_at, :second)
format_seconds(diff)
end

@doc """
Formats a number of seconds into a human-readable duration string.
"""
def format_seconds(seconds) when seconds < 60, do: "#{seconds}s"

def format_seconds(seconds) when seconds < 3600 do
m = div(seconds, 60)
s = rem(seconds, 60)
if s == 0, do: "#{m}m", else: "#{m}m #{s}s"
end

def format_seconds(seconds) do
h = div(seconds, 3600)
m = div(rem(seconds, 3600), 60)
if m == 0, do: "#{h}h", else: "#{h}h #{m}m"
end

@doc """
Formats a DateTime as "YYYY-MM-DD HH:MM:SS" or "—" for nil.
"""
def format_datetime(nil), do: "—"

def format_datetime(%DateTime{} = dt) do
Calendar.strftime(dt, "%Y-%m-%d %H:%M:%S")
end

@doc """
Formats an integer with comma separators.
"""
def format_number(n) when is_integer(n) and n < 0 do
"-" <> format_number(-n)
end

def format_number(n) when is_integer(n) do
n
|> Integer.to_string()
|> String.graphemes()
|> Enum.reverse()
|> Enum.chunk_every(3)
|> Enum.map_join(",", &Enum.reverse/1)
|> String.reverse()
|> String.reverse()
end

@doc """
Strips the "Elixir." prefix from a module name string.
"""
def strip_elixir_prefix(module_str) when is_binary(module_str) do
String.replace_prefix(module_str, "Elixir.", "")
end
end
60 changes: 60 additions & 0 deletions lib/mix/tasks/durable.cancel.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
defmodule Mix.Tasks.Durable.Cancel do
@shortdoc "Cancels a workflow execution"

@moduledoc """
Cancels a running, pending, or waiting workflow execution.

## Usage

mix durable.cancel WORKFLOW_ID [options]

## Options

* `--reason REASON` - Cancellation reason
* `--name NAME` - The Durable instance name (default: Durable)

## Examples

mix durable.cancel abc12345-...
mix durable.cancel abc12345-... --reason "User requested cancellation"
"""

use Mix.Task

alias Durable.Executor
alias Durable.Mix.Helpers

@impl Mix.Task
def run(args) do
Helpers.ensure_started()

{opts, positional, _} =
OptionParser.parse(args, strict: [reason: :string, name: :string])

case positional do
[workflow_id | _] ->
cancel_workflow(workflow_id, opts)

[] ->
Mix.shell().error("Usage: mix durable.cancel WORKFLOW_ID [--reason REASON]")
end
end

defp cancel_workflow(workflow_id, opts) do
durable_name = Helpers.get_durable_name(opts)
reason = Keyword.get(opts, :reason)

case Executor.cancel_workflow(workflow_id, reason, durable: durable_name) do
:ok ->
Mix.shell().info("Workflow #{Helpers.truncate_id(workflow_id)} cancelled.")

{:error, :not_found} ->
Mix.shell().error("Workflow #{workflow_id} not found.")

{:error, :already_completed} ->
Mix.shell().error(
"Workflow #{workflow_id} has already completed and cannot be cancelled."
)
end
end
end
165 changes: 165 additions & 0 deletions lib/mix/tasks/durable.cleanup.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
defmodule Mix.Tasks.Durable.Cleanup do
@shortdoc "Deletes old workflow executions"

@moduledoc """
Deletes old workflow executions from the database.

Cascade deletes handle associated step executions, pending inputs, and events.

## Usage

mix durable.cleanup --older-than DURATION [options]

## Options

* `--older-than DURATION` - Required. Delete executions older than this duration.
Supports: `30d` (days), `24h` (hours), `60m` (minutes)
* `--status STATUS` - Only delete executions with this status (default: completed, failed).
Can be specified multiple times.
* `--dry-run` - Show how many records would be deleted without deleting
* `--batch-size N` - Number of records to delete per batch (default: 1000)
* `--name NAME` - The Durable instance name (default: Durable)

## Examples

mix durable.cleanup --older-than 30d
mix durable.cleanup --older-than 24h --status completed --dry-run
mix durable.cleanup --older-than 7d --batch-size 500
"""

use Mix.Task

import Ecto.Query

alias Durable.Config
alias Durable.Mix.Helpers
alias Durable.Repo
alias Durable.Storage.Schemas.WorkflowExecution

@default_statuses [:completed, :failed]
@default_batch_size 1000

@impl Mix.Task
def run(args) do
Helpers.ensure_started()

{opts, _, _} =
OptionParser.parse(args,
strict: [
older_than: :string,
status: [:string, :keep],
dry_run: :boolean,
batch_size: :integer,
name: :string
]
)

with {:ok, cutoff} <- parse_older_than(opts),
{:ok, statuses} <- parse_statuses(opts) do
durable_name = Helpers.get_durable_name(opts)
config = Config.get(durable_name)
dry_run = Keyword.get(opts, :dry_run, false)
batch_size = Keyword.get(opts, :batch_size, @default_batch_size)

if dry_run do
run_dry(config, cutoff, statuses)
else
run_cleanup(config, cutoff, statuses, batch_size)
end
end
end

defp parse_older_than(opts) do
case Keyword.get(opts, :older_than) do
nil ->
Mix.shell().error("--older-than is required. Example: --older-than 30d")
:error

duration_str ->
parse_duration(duration_str)
end
end

defp parse_duration(str) do
case Regex.run(~r/^(\d+)([dhm])$/, str) do
[_, num_str, unit] ->
num = String.to_integer(num_str)
seconds = duration_to_seconds(num, unit)
cutoff = DateTime.add(DateTime.utc_now(), -seconds, :second)
{:ok, cutoff}

nil ->
Mix.shell().error(
"Invalid duration: #{str}. Use format like 30d (days), 24h (hours), or 60m (minutes)."
)

:error
end
end

defp duration_to_seconds(num, "d"), do: num * 86_400
defp duration_to_seconds(num, "h"), do: num * 3_600
defp duration_to_seconds(num, "m"), do: num * 60

defp parse_statuses(opts) do
case Keyword.get_values(opts, :status) do
[] ->
{:ok, @default_statuses}

status_strings ->
statuses =
Enum.map(status_strings, fn s ->
String.to_existing_atom(s)
end)

{:ok, statuses}
end
rescue
ArgumentError ->
Mix.shell().error("Invalid status provided.")
:error
end

defp run_dry(config, cutoff, statuses) do
count = count_matching(config, cutoff, statuses)
status_str = Enum.map_join(statuses, ", ", &to_string/1)

Mix.shell().info(
"Dry run: #{Helpers.format_number(count)} executions would be deleted " <>
"(status: #{status_str}, older than #{Helpers.format_datetime(cutoff)})."
)
end

defp run_cleanup(config, cutoff, statuses, batch_size) do
total = do_batch_delete(config, cutoff, statuses, batch_size, 0)
Mix.shell().info("Deleted #{Helpers.format_number(total)} workflow executions.")
end

defp do_batch_delete(config, cutoff, statuses, batch_size, acc) do
ids_query =
from(w in WorkflowExecution,
where: w.status in ^statuses and w.inserted_at < ^cutoff,
select: w.id,
limit: ^batch_size
)

delete_query = from(w in WorkflowExecution, where: w.id in subquery(ids_query))
{deleted, _} = Repo.delete_all(config, delete_query)

if deleted > 0 do
do_batch_delete(config, cutoff, statuses, batch_size, acc + deleted)
else
acc
end
end

defp count_matching(config, cutoff, statuses) do
query =
from(w in WorkflowExecution,
where: w.status in ^statuses and w.inserted_at < ^cutoff,
select: count(w.id)
)

Repo.one(config, query)
end
end
Loading