diff --git a/README.md b/README.md index 2ab70ca..5f448d4 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ A durable, resumable workflow engine for Elixir. Similar to Temporal/Inngest. - **Compensations** - Saga pattern with automatic rollback - **Cron Scheduling** - Recurring workflows with cron expressions - **Reliability** - Automatic retries with exponential/linear/constant backoff +- **Orchestration** - Parent/child workflow composition - **Persistence** - PostgreSQL-backed execution state ## Installation @@ -439,6 +440,38 @@ hours(2) # 7_200_000 ms days(7) # 604_800_000 ms ``` +### Orchestration + +```elixir +use Durable.Orchestration + +# Synchronous: call child and wait for result +case call_workflow(MyApp.PaymentWorkflow, %{"amount" => 100}, timeout: hours(1)) do + {:ok, result} -> {:ok, assign(data, :payment, result)} + {:error, reason} -> {:error, reason} +end + +# Fire-and-forget: start child and continue +{:ok, child_id} = start_workflow(MyApp.EmailWorkflow, %{"to" => email}, ref: :welcome) + +# call_workflow also works inside parallel blocks (executed inline) +parallel do + step :payment, fn data -> + case call_workflow(MyApp.PaymentWorkflow, %{"amount" => data.total}, ref: :pay) do + {:ok, result} -> {:ok, assign(data, :payment, result)} + {:error, reason} -> {:error, reason} + end + end + + step :shipping, fn data -> + case call_workflow(MyApp.ShippingWorkflow, %{"id" => data.order_id}, ref: :ship) do + {:ok, result} -> {:ok, assign(data, :shipping, result)} + {:error, reason} -> {:error, reason} + end + end +end +``` + ### API ```elixir @@ -449,6 +482,7 @@ Durable.list_executions(workflow: Module, status: :running) Durable.cancel(id, "reason") Durable.send_event(id, "event", payload) Durable.provide_input(id, "input_name", data) +Durable.list_children(parent_id) ``` ## Guides @@ -457,10 +491,10 @@ Durable.provide_input(id, "input_name", data) - [Parallel](guides/parallel.md) - Concurrent execution - [Compensations](guides/compensations.md) - Saga pattern - [Waiting](guides/waiting.md) - Sleep, events, human input +- [Orchestration](guides/orchestration.md) - Parent/child workflow composition ## Coming Soon -- Workflow orchestration (parent/child workflows) - Phoenix LiveView dashboard ## License diff --git a/agents/IMPLEMENTATION_PLAN.md b/agents/IMPLEMENTATION_PLAN_ARCHIVED.md similarity index 99% rename from agents/IMPLEMENTATION_PLAN.md rename to agents/IMPLEMENTATION_PLAN_ARCHIVED.md index 4c39e87..0adde58 100644 --- a/agents/IMPLEMENTATION_PLAN.md +++ b/agents/IMPLEMENTATION_PLAN_ARCHIVED.md @@ -1,5 +1,13 @@ # DurableWorkflow Implementation Plan +> **⚠️ ARCHIVED (2026-01-23):** This document is no longer maintained. +> See `WORKPLAN.md` for current status and `arch.md` for technical reference. +> +> Key changes since this document was written: +> - ForEach primitive was **removed** (use `Enum.map` instead) +> - Parallel execution uses new results model (`__results__`, `into:`, `returns:`) +> - Loop primitive was **never implemented** (use step retries or `Enum` functions) + ## Executive Summary This document outlines the complete implementation plan for **Durable**, a durable, resumable workflow engine for Elixir. diff --git a/agents/WORKPLAN.md b/agents/WORKPLAN.md index dfa85bf..d7393c0 100644 --- a/agents/WORKPLAN.md +++ b/agents/WORKPLAN.md @@ -1,8 +1,8 @@ # Durable Workflow Engine - Work Plan -**Last Updated:** 2026-01-03 +**Last Updated:** 2026-03-08 **Overall Progress:** ~75% Complete -**Reference:** See `IMPLEMENTATION_PLAN.md` for detailed design and code examples +**Reference:** See `arch.md` for technical architecture --- @@ -10,10 +10,10 @@ | Metric | Value | |--------|-------| -| Source Modules | 41 | -| Passing Tests | 214 | +| Source Modules | 42 | +| Passing Tests | ~291 | | Documentation Guides | 6 | -| Lines of Code | ~8,500 | +| Lines of Code | ~11,000 | --- @@ -24,8 +24,8 @@ | 0 | Project Foundation | Complete | 100% | | 1 | Core MVP | Complete | 100% | | 2 | Observability | Partial | 40% | -| 3 | Advanced Features | Mostly Complete | 85% | -| 4 | Scalability | Not Started | 0% | +| 3 | Advanced Features | Mostly Complete | 90% | +| 4 | Scalability | Partial | ~5% | | 5 | Developer Experience | Partial | 35% | --- @@ -86,9 +86,9 @@ --- -## Phase 3: Advanced Features [85%] +## Phase 3: Advanced Features [90%] -### 3.1-3.3 Wait Primitives [COMPLETE - 46 tests] +### 3.1-3.3 Wait Primitives [COMPLETE - 52 tests] | Feature | Status | |---------|--------| @@ -105,7 +105,7 @@ | Timeout handling | Complete | | Context preservation | Complete | -### 3.4 Conditional Branching [COMPLETE - 10 tests] +### 3.4 Conditional Branching [COMPLETE - 19 tests] | Feature | Status | |---------|--------| @@ -117,32 +117,33 @@ ### 3.5 Loops [SKIPPED] -Intentionally skipped - use step-level retries or `foreach` instead. +Intentionally skipped - use step-level retries or Elixir's `Enum` functions instead. -### 3.6 Parallel Execution [COMPLETE - 13 tests] +### 3.6 Parallel Execution [COMPLETE - 20 tests] | Feature | Status | |---------|--------| | `parallel do` macro | Complete | -| Context merge strategies | Complete | +| Results model (`__results__`) | Complete | +| `into:` custom merge function | Complete | +| `returns:` option | Complete | | Error strategies | Complete | | Resume durability | Complete | -### 3.7 ForEach [COMPLETE - 13 tests] +See `guides/parallel.md` for comprehensive documentation. -| Feature | Status | -|---------|--------| -| `foreach` macro | Complete | -| `current_item/0`, `current_index/0` | Complete | -| Sequential/Concurrent modes | Complete | -| `:collect_as` option | Complete | -| `:on_error` handling | Complete | +### 3.7 ForEach [REMOVED] + +**Decision (2026-01-23):** The `foreach` primitive was removed. Users should use +Elixir's built-in enumeration tools (`Enum.map`, `Task.async_stream`) for batch +processing instead. This simplifies the DSL while providing the same functionality +through idiomatic Elixir. ### 3.8 Switch/Case [NOT STARTED] Low priority - `branch` macro covers most cases. -### 3.9 Compensation/Saga [COMPLETE - 6 tests] +### 3.9 Compensation/Saga [COMPLETE - 10 tests] | Feature | Status | |---------|--------| @@ -151,7 +152,7 @@ Low priority - `branch` macro covers most cases. | Reverse-order execution | Complete | | CompensationRunner | Complete | -### 3.10 Cron Scheduling [COMPLETE - 45 tests] +### 3.10 Cron Scheduling [COMPLETE - 49 tests] | Feature | Status | |---------|--------| @@ -165,22 +166,35 @@ Low priority - `branch` macro covers most cases. | Manual trigger | Complete | | Telemetry events | Complete | +### 3.11 Workflow Orchestration [COMPLETE - 12 tests] + +| Feature | Status | +|---------|--------| +| `call_workflow/3` (synchronous) | Complete | +| `start_workflow/3` (fire-and-forget) | Complete | +| `call_workflow` in `parallel` blocks (inline execution) | Complete | +| Idempotent resume | Complete | +| Cascade cancellation | Complete | +| Parent notification on child complete/fail | Complete | +| Nested workflows (A → B → C) | Complete | +| `Durable.list_children/2` API | Complete | + +See `guides/orchestration.md` for comprehensive documentation. + ### Remaining Phase 3 Work | Feature | Priority | Complexity | |---------|----------|------------| -| Workflow Orchestration (`call_workflow`) | High | Medium | -| Parent-child tracking | High | Low | | Switch/Case macro | Low | Low | | Pipe-based API | Low | Medium | --- -## Phase 4: Scalability [0%] +## Phase 4: Scalability [~5%] | Feature | Priority | Complexity | |---------|----------|------------| -| Queue Adapter Behaviour | Complete | - | +| Queue Adapter Behaviour | **Complete** | - | | Redis Queue Adapter | Medium | Medium | | RabbitMQ Queue Adapter | Low | Medium | | Message Bus Behaviour | Medium | Low | @@ -229,25 +243,24 @@ Note: Multi-node scheduling already works via `FOR UPDATE SKIP LOCKED`. ### High Priority 1. Guide: Getting Started -2. Workflow Orchestration (`call_workflow`) -3. HexDocs Publishing -4. `mix durable.status` +2. HexDocs Publishing +3. `mix durable.status` ### Medium Priority -5. Guide: Testing Workflows -6. `Durable.TestCase` -7. Graph Generation -8. `mix durable.list` -9. pg_notify Message Bus +4. Guide: Testing Workflows +5. `Durable.TestCase` +6. Graph Generation +7. `mix durable.list` +8. pg_notify Message Bus ### Lower Priority -10. Switch/Case macro -11. Redis Queue Adapter -12. Phoenix Dashboard -13. Example Project -14. Pipe-based API +9. Switch/Case macro +10. Redis Queue Adapter +11. Phoenix Dashboard +12. Example Project +13. Pipe-based API --- @@ -255,34 +268,66 @@ Note: Multi-node scheduling already works via `FOR UPDATE SKIP LOCKED`. | Test File | Tests | Area | |-----------|-------|------| -| scheduler_test.exs | 45 | Cron scheduling | -| wait_test.exs | 46 | Wait primitives | -| decision_test.exs | 13 | Decision steps | -| parallel_test.exs | 13 | Parallel execution | -| foreach_test.exs | 13 | ForEach iteration | +| wait_test.exs | 52 | Wait primitives | +| scheduler_test.exs | 49 | Cron scheduling | +| parallel_test.exs | 20 | Parallel execution | +| branch_test.exs | 19 | Branch macro | +| postgres_test.exs | 16 | Queue adapter | +| decision_test.exs | 14 | Decision steps | | log_capture_test.exs | 13 | Log/IO capture | +| orchestration_test.exs | 12 | Workflow orchestration | | integration_test.exs | 11 | End-to-end flows | -| branch_test.exs | 10 | Branch macro | -| durable_test.exs | 8 | Core API | -| compensation_test.exs | 6 | Saga pattern | -| Other | ~36 | Queue, handlers, etc. | -| **Total** | **214** | | +| validation_test.exs | 10 | Input validation | +| context_test.exs | 10 | Context management | +| compensation_test.exs | 10 | Saga pattern | +| durable_test.exs | 10 | Core API | +| handler_test.exs | 8 | Log handler | +| io_server_test.exs | 7 | IO capture | +| resume_edge_cases_test.exs | 5 | Resume edge cases | +| log_capture/integration_test.exs | 5 | Log capture integration | +| Other | ~20 | Misc | +| **Total** | **~291** | | --- ## Known Limitations -1. Wait primitives not supported in parallel/foreach blocks -2. No backward jumps in decision steps (forward-only by design) -3. Context is single-level atomized (top-level keys only) -4. No workflow versioning +1. Wait primitives not supported in parallel blocks +2. Child workflows with waits (`sleep`, `wait_for_event`) not supported in parallel blocks +3. No backward jumps in decision steps (forward-only by design) +4. Context is single-level atomized (top-level keys only) +5. No workflow versioning +6. No foreach/loop DSL primitives (use Elixir's `Enum` functions) --- ## Next Steps 1. **Documentation** - Getting Started guide and HexDocs publishing -2. **Workflow Orchestration** - Child workflow support (`call_workflow`) -3. **Graph Visualization** - Understanding complex workflows +2. **Graph Visualization** - Understanding complex workflows +3. **Testing Helpers** - `Durable.TestCase` for easier workflow testing + +The existing ~291 tests provide good confidence in implemented features. Suitable for internal use; additional documentation needed before public release. + +--- -The existing 214 tests provide good confidence in implemented features. Suitable for internal use; additional documentation needed before public release. +## Changelog + +### 2026-02-27 +- Added `call_workflow` support inside `parallel` blocks (inline synchronous execution) +- Child workflows in parallel execute synchronously with process state save/restore +- 3 new tests for parallel call_workflow (total: ~291) +- Updated guides/orchestration.md, guides/parallel.md, and README.md +- Added workflow orchestration: `call_workflow/3` (synchronous) and `start_workflow/3` (fire-and-forget) +- Added `Durable.Orchestration` module with `use Durable.Orchestration` macro +- Added cascade cancellation (cancelling parent cancels active children) +- Added parent notification on child completion/failure +- Added `Durable.list_children/2` API +- Added `guides/orchestration.md` documentation +- 12 new tests for orchestration + +### 2026-01-23 +- Removed `foreach` primitive (use `Enum.map` or `Task.async_stream` instead) +- Updated parallel execution with new results model (`__results__`, `into:`, `returns:`) +- Updated documentation in `guides/parallel.md` +- Archived `IMPLEMENTATION_PLAN.md` (now `IMPLEMENTATION_PLAN_ARCHIVED.md`) diff --git a/agents/arch.md b/agents/arch.md index fcb9301..ba7abb7 100644 --- a/agents/arch.md +++ b/agents/arch.md @@ -35,13 +35,13 @@ │ │ │ │ ┌─────────────┐ ┌──────────────┐ ┌─────────────────┐ │ │ │ Queue │ │ Message │ │ Scheduler │ │ -│ │ Manager │ │ Bus │ │ (Cron) │ │ +│ │ Manager │ │ Bus [P] │ │ (Cron) │ │ │ └─────────────┘ └──────────────┘ └─────────────────┘ │ │ │ │ │ │ ┌─────────────┐ ┌──────────────┐ ┌─────────────────┐ │ -│ │ Postgres/ │ │ PubSub/ │ │ Graph │ │ -│ │ Redis/ │ │ Redis/ │ │ Generator │ │ -│ │ RabbitMQ │ │ pg_notify │ │ + Layout │ │ +│ │ Postgres │ │ (planned) │ │ Graph [P] │ │ +│ │ │ │ │ │ Generator │ │ +│ │ │ │ │ │ + Layout │ │ │ └─────────────┘ └──────────────┘ └─────────────────┘ │ │ │ │ ┌──────────────────────────────────────────────────────┐ │ @@ -64,7 +64,7 @@ defmodule OrderWorkflow do use Durable use Durable.Context # context(), get_context(), put_context() - use Durable.Wait # wait_for_input(), sleep_for(), wait_for_event() + use Durable.Wait # wait_for_input(), sleep(), wait_for_event() workflow "process_order", timeout: hours(2), max_retries: 3 do @@ -183,111 +183,142 @@ end Note: The `branch` macro is preferred for new workflows as it's more readable and allows multiple steps per branch. -### Loops - -```elixir -loop :retry_until_success, - while: fn ctx -> !ctx.success && ctx.current_retry < ctx.max_retries end do - - step :attempt_api_call do - case ExternalAPI.call() do - {:ok, result} -> put_context(:success, true) - {:error, _} -> update_context(:current_retry, & &1 + 1) - end - end - - step :backoff do - unless context().success do - delay = :math.pow(2, context().current_retry) |> round() - sleep_for(seconds: delay) - end - end -end -``` - ### Parallel Execution +Execute multiple steps concurrently. Results are stored in a structured format. + ```elixir parallel do step :send_welcome_email do - EmailService.send_welcome(state().user.email) + EmailService.send_welcome(get_context(:user).email) end - + step :provision_workspace do - WorkspaceService.create(state().user.id) + WorkspaceService.create(get_context(:user_id)) end - + step :create_stripe_customer do - StripeService.create_customer(state().user) + StripeService.create_customer(get_context(:user)) + end +end + +# Results automatically stored as: +# context.__results__ = %{ +# send_welcome_email: {:ok, email_result}, +# provision_workspace: {:ok, workspace_result}, +# create_stripe_customer: {:ok, stripe_result} +# } + +# Access results in subsequent steps: +step :finalize do + results = parallel_results() # Get all results + email = parallel_result(:send_welcome_email) # Get specific result + + if parallel_ok?(:provision_workspace) do + # All good end end ``` -### ForEach +**Custom result handling with `into:`:** ```elixir -foreach :process_items, items: fn -> context().items end do |item| - step :process_item do - result = ItemProcessor.process(item) - append_context(:results, result) - end +parallel into: fn ctx, results -> + # Custom merge function receives context and results map + successful = Enum.filter(results, fn {_k, v} -> match?({:ok, _}, v) end) + put_context(:successful_count, length(successful)) +end do + step :task_a do ... end + step :task_b do ... end end ``` +**Options:** +- `into:` - Custom function to merge results into context +- `returns:` - Specify which step's result to return (`:first_completed`, `:all`, or step name) +- Error handling: Failures are captured in results, workflow continues + +See `guides/parallel.md` for comprehensive documentation. + ### Workflow Orchestration -Call child workflows from parent steps to compose larger workflows: +Call child workflows from parent steps to compose larger workflows. +Requires `use Durable.Orchestration`. ```elixir -workflow "order_pipeline" do - step :validate do - put_context(:order, input()["order"]) - end +defmodule OrderPipeline do + use Durable + use Durable.Context + use Durable.Orchestration - # Call child workflow and wait for result - step :process_payment do - {:ok, result} = call_workflow(MyApp.PaymentWorkflow, %{ - order_id: get_context(:order).id, - amount: get_context(:order).total - }) - put_context(:payment, result) - end + workflow "order_pipeline" do + step :validate do + put_context(:order, input()["order"]) + end - # Fire-and-forget (don't wait for completion) - step :send_notifications do - start_workflow(MyApp.NotificationWorkflow, %{ - user_id: get_context(:order).user_id, - event: :order_completed - }) - end + # Call child workflow and wait for result + step :process_payment do + {:ok, result} = call_workflow(MyApp.PaymentWorkflow, %{ + order_id: get_context(:order).id, + amount: get_context(:order).total + }) + put_context(:payment, result) + end + + # Fire-and-forget (don't wait for completion) + step :send_notifications do + start_workflow(MyApp.NotificationWorkflow, %{ + user_id: get_context(:order).user_id, + event: :order_completed + }) + end + + # Call workflows in parallel + parallel do + step :sync_crm do + call_workflow(MyApp.CRMSyncWorkflow, %{order: get_context(:order)}) + end + + step :generate_invoice do + call_workflow(MyApp.InvoiceWorkflow, %{order: get_context(:order)}) + end + end - step :finalize do - OrderService.complete(get_context(:order).id) + step :finalize do + OrderService.complete(get_context(:order).id) + end end end ``` **Options:** -- `call_workflow/3` - Start child and wait for result -- `start_workflow/3` - Fire-and-forget +- `call_workflow/2,3` - Start child and wait for result +- `start_workflow/2,3` - Fire-and-forget - Parent-child relationships tracked via `parent_workflow_id` +- `Durable.list_children/2` - List child workflows +- Cascade cancellation: cancelling parent cancels active children + +See `guides/orchestration.md` for comprehensive documentation. ### Switch/Case +> **Note:** Not yet implemented. Use the `branch` macro instead for conditional execution. + ```elixir +# PLANNED - NOT IMPLEMENTED switch :route_by_category, on: fn -> context().category end do case_match "billing" do step :assign_to_billing do TicketService.assign(input().ticket, team: :billing) end end - + case_match "technical" do step :assign_to_engineering do TicketService.assign(input().ticket, team: :engineering) end end - + default do step :assign_to_general_support do TicketService.assign(input().ticket, team: :general) @@ -296,6 +327,15 @@ switch :route_by_category, on: fn -> context().category end do end ``` +**Current Alternative:** Use the `branch` macro with pattern matching: +```elixir +branch on: get_context(:category) do + "billing" -> step :assign_billing do ... end + "technical" -> step :assign_engineering do ... end + _ -> step :assign_general do ... end +end +``` + --- ## Context Management @@ -336,9 +376,13 @@ workflow_id() current_step() # Accumulators -init_accumulator(:events, []) append_context(:events, new_event) increment_context(:counter, 1) + +# Parallel Results (after parallel block) +parallel_results() # Get all parallel results as map +parallel_result(:step_name) # Get specific step result +parallel_ok?(:step_name) # Check if step succeeded ``` --- @@ -348,14 +392,14 @@ increment_context(:counter, 1) ### Sleep ```elixir -# Sleep for duration -sleep_for(seconds: 30) -sleep_for(minutes: 5) -sleep_for(hours: 24) -sleep_for(days: 7) +# Sleep for duration (using time helpers from Durable.DSL.TimeHelpers) +sleep(seconds(30)) +sleep(minutes(5)) +sleep(hours(24)) +sleep(days(7)) # Sleep until specific time -sleep_until(~U[2025-12-25 00:00:00Z]) +schedule_at(~U[2025-12-25 00:00:00Z]) ``` ### Wait for Events @@ -428,30 +472,13 @@ Every step automatically captures: ### Implementation ```elixir -# Custom Logger backend -defmodule Durable.LoggerBackend do - @behaviour :gen_event - - def handle_event({level, _gl, {Logger, msg, ts, metadata}}, state) do - case Process.get(:workflow_context) do - %{workflow_id: wf_id, step: step, attempt: attempt} -> - log_entry = %{ - timestamp: format_timestamp(ts), - level: level, - message: IO.iodata_to_binary(msg), - metadata: Map.new(metadata), - workflow_id: wf_id, - step: step, - attempt: attempt - } - - store_log(log_entry) - end - end +# Logger handler (Erlang :logger handler, not gen_event backend) +defmodule Durable.LogCapture.Handler do + # Captures Logger calls per-step using process dictionary context end # IO capture via group leader -defmodule Durable.IOCapture do +defmodule Durable.LogCapture.IOServer do # Intercepts IO.puts/IO.inspect and stores as logs end ``` @@ -460,25 +487,10 @@ end ```elixir # Get logs for specific step -{:ok, logs} = Durable.get_step_logs(workflow_id, step: :charge_payment) - -# Get all logs for workflow -{:ok, all_logs} = Durable.get_execution_logs(workflow_id) +{:ok, logs} = Durable.Query.get_step_logs(workflow_id, :charge_payment) -# Real-time log streaming -Durable.stream_logs(workflow_id) - -# Logs stored in database -execution.steps -# => [ -# %StepExecution{ -# step: :charge_payment, -# logs: [ -# %{timestamp: ~U[...], level: :info, message: "Attempting payment"}, -# %{timestamp: ~U[...], level: :error, message: "Payment failed"} -# ] -# } -# ] +# Logs stored in step_executions table as JSONB +# Each step execution record contains its captured logs ``` --- @@ -560,34 +572,29 @@ end ### Built-in Adapters -1. **PostgreSQL** (default) - Advisory locks + polling -2. **Redis** - Sorted sets with priorities -3. **RabbitMQ** - Priority queues -4. **Kafka** - Topic-based -5. **NATS** - JetStream +1. **PostgreSQL** (default) - `Durable.Queue.Adapters.Postgres` — `FOR UPDATE SKIP LOCKED` + polling + +> **Planned — not yet implemented:** +> 2. **Redis** - Sorted sets with priorities +> 3. **RabbitMQ** - Priority queues +> 4. **Kafka** - Topic-based +> 5. **NATS** - JetStream ### Configuration +Durable is configured via the supervision tree, not application config: + ```elixir -# config/config.exs -config :durable_workflow, - # Default: PostgreSQL - queue_adapter: Durable.Queue.PostgresAdapter, - queue_adapter_opts: [repo: MyApp.Repo], - - # Or Redis - # queue_adapter: Durable.Queue.RedisAdapter, - # queue_adapter_opts: [host: "localhost", port: 6379], - - # Or RabbitMQ - # queue_adapter: Durable.Queue.RabbitMQAdapter, - # queue_adapter_opts: [url: "amqp://guest:guest@localhost"], - - queues: %{ - default: [concurrency: 10], - high_priority: [concurrency: 20], - background: [concurrency: 5] - } +children = [ + MyApp.Repo, + {Durable, + repo: MyApp.Repo, + queues: %{ + default: [concurrency: 10], + high_priority: [concurrency: 20], + background: [concurrency: 5] + }} +] ``` ### Usage @@ -602,11 +609,10 @@ config :durable_workflow, scheduled_at: DateTime.add(DateTime.utc_now(), 3600, :second) ) -# Queue operations -Durable.Queue.pause(:low_priority) -Durable.Queue.resume(:low_priority) -Durable.Queue.get_stats(:default) -# => %{running: 7, pending: 23, concurrency: 10} +# Queue operations (via Durable.Queue.Manager) +Durable.Queue.Manager.pause(Durable, :low_priority) +Durable.Queue.Manager.resume(Durable, :low_priority) +Durable.Queue.Manager.stats(Durable, :default) ``` --- @@ -618,58 +624,62 @@ Durable.Queue.get_stats(:default) ```elixir defmodule ReportWorkflow do use Durable - use Durable.Cron - + use Durable.Scheduler.DSL + # Daily report at 9 AM - @cron "0 9 * * *" - @cron_queue :reports - @cron_input %{type: :daily} - @cron_timezone "America/New_York" + @schedule cron: "0 9 * * *", queue: :reports, input: %{type: :daily}, timezone: "America/New_York" workflow "daily_report" do step :generate_report do ReportService.generate(input().type) end end - + # Every hour - @cron "0 * * * *" + @schedule cron: "0 * * * *" workflow "hourly_sync" do step :sync_data do DataService.sync() end end - + # Every 15 minutes - @cron "*/15 * * * *" + @schedule cron: "*/15 * * * *" workflow "health_check" do step :check_services do HealthCheckService.check_all() end end end - -# Auto-register on app start -MyApp.ReportWorkflow.schedule_all_crons() ``` ### Manual Scheduling (Alternative) ```elixir -Durable.Scheduler.schedule( - "daily_report", +Durable.schedule( ReportWorkflow, - "generate_report", - "0 9 * * *", + "daily_report", + cron: "0 9 * * *", input: %{type: :daily}, queue: :reports, timezone: "America/New_York" ) + +# CRUD API +Durable.list_schedules(Durable) +Durable.get_schedule(Durable, schedule_id) +Durable.update_schedule(Durable, attrs) +Durable.delete_schedule(Durable, schedule_id) +Durable.enable_schedule(Durable, schedule_id) +Durable.disable_schedule(Durable, schedule_id) +Durable.trigger_schedule(Durable, schedule_id) ``` --- ## Message Bus (Pluggable) +> **Planned — not yet implemented.** The designs below are kept as future reference. + ### Message Bus Behavior ```elixir @@ -726,6 +736,8 @@ Durable.Events.publish_workflow_event(workflow_id, :custom_event, %{data: "..."} ## Graph Visualization +> **Planned — not yet implemented.** The designs below are kept as future reference. + ### Graph Generation ```elixir @@ -890,6 +902,26 @@ create table(:scheduled_workflows) do add :next_run_at, :utc_datetime_usec timestamps() end + +# pending_events (wait_for_event / wait_for_any / wait_for_all) +create table(:pending_events) do + add :workflow_execution_id, references(:workflow_executions) + add :event_name, :string + add :status, :string # pending, received, expired + add :payload, :map + add :timeout_at, :utc_datetime_usec + timestamps() +end + +# wait_groups (wait_for_all / wait_for_any coordination) +create table(:wait_groups) do + add :workflow_execution_id, references(:workflow_executions) + add :step_name, :string + add :strategy, :string # all, any + add :status, :string # pending, completed, expired + add :timeout_at, :utc_datetime_usec + timestamps() +end ``` --- @@ -924,33 +956,17 @@ end {:ok, execution} = Durable.get_execution(workflow_id, include_logs: true) # List executions -executions = Durable.list_executions( +executions = Durable.Query.list_executions( workflow: OrderWorkflow, status: :running, limit: 50 ) -# Query with filters -executions = Durable.Query.find_executions( - workflow: OrderWorkflow, - current_step: :charge_payment, - status: :running, - time_range: [from: ~U[2025-01-01 00:00:00Z], to: DateTime.utc_now()] -) +# Get step logs +{:ok, logs} = Durable.Query.get_step_logs(workflow_id, :charge_payment) -# Get metrics -metrics = Durable.get_metrics( - OrderWorkflow, - period: :last_24_hours -) -# => %{ -# total_executions: 1234, -# successful: 1180, -# failed: 54, -# success_rate: 0.956, -# avg_duration_ms: 2340, -# p95_duration_ms: 4500 -# } +# List child workflows +children = Durable.list_children(Durable, parent_workflow_id) ``` ### Controlling Workflows @@ -1209,13 +1225,15 @@ Benefits: ### Phase 3: Advanced Features - [x] Wait primitives (sleep, wait_for_event, wait_for_input) - [x] Decision steps (legacy `decision` + `{:goto}`) -- [x] Branch macro (new intuitive conditional flow) -- [ ] Loops and iterations -- [ ] Parallel execution -- [ ] Workflow orchestration (call child workflows) +- [x] Branch macro (intuitive conditional flow) +- [x] Parallel execution (with results model) +- [x] Compensation/saga +- [x] Cron scheduling +- [~] ForEach - **REMOVED** (use `Enum.map` instead) +- [~] Loops - **Skipped** (use step retries or `Enum` functions) +- [x] Workflow orchestration (call child workflows) +- [ ] Switch/case macro - [ ] Pipe-based API (functional workflow composition) -- [ ] Compensation/saga -- [ ] Cron scheduling ### Phase 4: Scalability - [ ] Redis queue adapter @@ -1224,6 +1242,8 @@ Benefits: - [ ] Horizontal scaling support ### Phase 5: Developer Experience +- [x] Module documentation (@moduledoc, @doc) +- [x] 6 documentation guides - [ ] CLI tools - [ ] Mix tasks - [ ] Testing helpers @@ -1285,35 +1305,28 @@ end ## Configuration Reference +Durable is configured via the supervision tree (not application config): + ```elixir -# config/config.exs -config :durable_workflow, - # Queue adapter - queue_adapter: Durable.Queue.PostgresAdapter, - queue_adapter_opts: [repo: MyApp.Repo], - - # Queues - queues: %{ - default: [concurrency: 10], - high_priority: [concurrency: 20], - background: [concurrency: 5] - }, - - # Message bus - message_bus: Durable.MessageBus.PostgresAdapter, - message_bus_opts: [repo: MyApp.Repo], - - # Scheduler - scheduler: [ - enabled: true, - timezone: "America/New_York" - ], - - # Retention - retention: [ - completed: [days: 30], - failed: [days: 90] - ] +children = [ + MyApp.Repo, + {Durable, + # Required + repo: MyApp.Repo, + + # Optional + name: Durable, # Instance name (default: Durable) + prefix: "durable", # PostgreSQL schema (default: "durable") + queue_enabled: true, # Enable queue processing (default: true) + queues: %{ + default: [concurrency: 10, poll_interval: 1_000], + high_priority: [concurrency: 20], + background: [concurrency: 5] + }, + stale_lock_timeout: 300, # Seconds before lock is stale (default: 300) + heartbeat_interval: 30_000 # Worker heartbeat interval ms (default: 30_000) + } +] ``` --- diff --git a/agents/context-index.md b/agents/context-index.md index 7f51837..2cc54e5 100644 --- a/agents/context-index.md +++ b/agents/context-index.md @@ -8,6 +8,31 @@ This index provides quick access to archived development discussions and impleme |-------|--------------|----------|--------| | [Wait Primitives Complete](./conversations/wait-primitives-complete/) | 2026-01-03 | 1 | Completed | +--- + +## 2026-01-23: ForEach Removal & Parallel Refactor + +**Key Changes:** +- Removed `foreach` primitive entirely +- Simplified parallel execution with new results model (`__results__`, `into:`, `returns:`) +- Updated `guides/parallel.md` with comprehensive documentation +- Consolidated planning docs: archived `IMPLEMENTATION_PLAN.md` + +**Files Changed:** +- `lib/durable/dsl/step.ex` - Removed foreach macro +- `lib/durable/executor.ex` - Removed foreach execution +- `test/durable/foreach_test.exs` - Deleted +- `guides/foreach.md` - Deleted +- `agents/WORKPLAN.md` - Updated status, test counts, added changelog +- `agents/arch.md` - Removed outdated sections, updated parallel docs +- `agents/IMPLEMENTATION_PLAN.md` → `agents/IMPLEMENTATION_PLAN_ARCHIVED.md` + +**Decision Rationale:** Users should use `Enum.map` or `Task.async_stream` for batch +processing. This simplifies the DSL while providing equivalent functionality through +idiomatic Elixir patterns. + +--- + ## Completed Topics | Topic | Completed | Description | @@ -91,15 +116,16 @@ Covers fixing CI failures after the parallel jobs feature. Key outcomes: ``` agents/ -├── conversations/ # Archived discussion topics +├── conversations/ # Archived discussion topics │ └── {topic-slug}/ -│ ├── README.md # Topic overview -│ ├── sessions/ # Individual session records +│ ├── README.md # Topic overview +│ ├── sessions/ # Individual session records │ └── implementation-plan.md -├── context-index.md # This file -├── .archived-topics.json # Machine-readable metadata -├── arch.md # Architecture notes -└── WORKPLAN.md # Work planning +├── context-index.md # This file +├── .archived-topics.json # Machine-readable metadata +├── arch.md # Architecture & technical reference +├── WORKPLAN.md # Current status & work planning +└── IMPLEMENTATION_PLAN_ARCHIVED.md # Historical (no longer maintained) ``` --- diff --git a/guides/orchestration.md b/guides/orchestration.md new file mode 100644 index 0000000..c28af98 --- /dev/null +++ b/guides/orchestration.md @@ -0,0 +1,379 @@ +# Workflow Orchestration + +Compose workflows by calling child workflows from parent steps. + +## Setup + +```elixir +defmodule MyApp.MyWorkflow do + use Durable + use Durable.Helpers + use Durable.Context + use Durable.Orchestration # Import orchestration functions +end +``` + +## `call_workflow/3` — Synchronous Child + +Start a child workflow and wait for its result. The parent suspends until the child completes or fails. + +```elixir +workflow "order_pipeline" do + step :charge, fn data -> + case call_workflow(MyApp.PaymentWorkflow, %{"amount" => data.total}, + timeout: hours(1)) do + {:ok, result} -> + {:ok, assign(data, :payment_id, result["payment_id"])} + {:error, reason} -> + {:error, "Payment failed: #{inspect(reason)}"} + end + end +end +``` + +**Options:** + +| Option | Description | Default | +|--------|-------------|---------| +| `:ref` | Reference name for idempotency | Module name | +| `:timeout` | Max wait time in ms | None (wait forever) | +| `:timeout_value` | Value returned on timeout | `:child_timeout` | +| `:queue` | Queue for child workflow | `"default"` | + +**Return values:** + +| Child Status | Return | +|-------------|--------| +| Completed | `{:ok, child_context}` | +| Failed | `{:error, error_info}` | +| Cancelled | `{:error, error_info}` | +| Timeout | `{:ok, timeout_value}` | + +### How It Works + +1. Parent step calls `call_workflow(ChildModule, input)` +2. Child workflow execution is created with `parent_workflow_id` set +3. Parent suspends (like `wait_for_event`) +4. Child runs in the queue independently +5. When child completes/fails, parent is automatically notified +6. Parent resumes and `call_workflow` returns the result + +## `start_workflow/3` — Fire-and-Forget + +Start a child workflow without waiting. Parent continues immediately. + +```elixir +workflow "onboarding" do + step :send_emails, fn data -> + {:ok, welcome_id} = start_workflow(MyApp.EmailWorkflow, + %{"to" => data.email, "template" => "welcome"}, + ref: :welcome_email + ) + {:ok, assign(data, :welcome_workflow_id, welcome_id)} + end + + step :next_step, fn data -> + # Parent continues — doesn't wait for email to send + {:ok, data} + end +end +``` + +**Options:** + +| Option | Description | Default | +|--------|-------------|---------| +| `:ref` | Reference name for idempotency | Module name | +| `:queue` | Queue for child workflow | `"default"` | + +### Idempotency + +Both functions are idempotent on resume. If the parent workflow crashes and restarts: + +- `call_workflow` — Won't create a duplicate child. If the child already completed, returns the result immediately. +- `start_workflow` — Won't create a duplicate child. Returns the same `child_id`. + +The `:ref` option controls idempotency grouping. Use different refs to create multiple children of the same type: + +```elixir +step :send_multiple, fn data -> + {:ok, _} = start_workflow(MyApp.EmailWorkflow, + %{"template" => "welcome"}, ref: :welcome) + + {:ok, _} = start_workflow(MyApp.EmailWorkflow, + %{"template" => "getting_started"}, ref: :getting_started) + + {:ok, data} +end +``` + +## Cascade Cancellation + +Cancelling a parent automatically cancels all active children: + +```elixir +# This cancels the parent AND any pending/running/waiting children +Durable.cancel(parent_workflow_id, "User cancelled order") +``` + +Children that already completed are not affected. + +## Querying Children + +List child workflows for a parent: + +```elixir +# All children +children = Durable.list_children(parent_workflow_id) + +# Filter by status +running = Durable.list_children(parent_workflow_id, status: :running) +completed = Durable.list_children(parent_workflow_id, status: :completed) +``` + +## Examples + +### Order Pipeline + +A parent workflow that calls payment and notification children: + +```elixir +defmodule MyApp.PaymentWorkflow do + use Durable + use Durable.Context + + workflow "charge" do + step :process, fn _data -> + amount = input()["amount"] + put_context(:payment_id, "pay_#{:rand.uniform(10_000)}") + put_context(:charged, amount) + end + end +end + +defmodule MyApp.EmailWorkflow do + use Durable + use Durable.Context + + workflow "send_email" do + step :deliver, fn _data -> + to = input()["to"] + template = input()["template"] + Mailer.deliver(to, template) + put_context(:delivered, true) + end + end +end + +defmodule MyApp.OrderWorkflow do + use Durable + use Durable.Context + use Durable.Orchestration + use Durable.Helpers + + workflow "process_order" do + step :validate, fn _data -> + order = input() + put_context(:order_id, order["id"]) + put_context(:total, order["total"]) + put_context(:email, order["email"]) + end + + # Synchronous — wait for payment result + step :charge_payment, fn data -> + case call_workflow(MyApp.PaymentWorkflow, + %{"amount" => data.total}, timeout: hours(1)) do + {:ok, result} -> + {:ok, assign(data, :payment_id, result["payment_id"])} + {:error, reason} -> + {:error, "Payment failed: #{inspect(reason)}"} + end + end + + # Fire-and-forget — email sent independently + step :send_confirmation, fn data -> + {:ok, email_id} = start_workflow(MyApp.EmailWorkflow, + %{"to" => data.email, "template" => "order_confirmed"}, + ref: :confirmation_email + ) + {:ok, assign(data, :email_workflow_id, email_id)} + end + + step :complete, fn data -> + {:ok, assign(data, :status, "completed")} + end + end +end + +# Start the pipeline +{:ok, id} = Durable.start(MyApp.OrderWorkflow, %{ + "id" => "order_123", + "total" => 99.99, + "email" => "user@example.com" +}) +``` + +### Nested Workflows (A → B → C) + +Workflows can call children that call their own children: + +```elixir +defmodule MyApp.StepC do + use Durable + use Durable.Context + + workflow "step_c" do + step :work, fn _data -> + put_context(:c_result, "done") + end + end +end + +defmodule MyApp.StepB do + use Durable + use Durable.Context + use Durable.Orchestration + + workflow "step_b" do + step :call_c, fn data -> + case call_workflow(MyApp.StepC, %{}) do + {:ok, result} -> + {:ok, assign(data, :c_result, result["c_result"])} + {:error, reason} -> + {:error, reason} + end + end + end +end + +defmodule MyApp.StepA do + use Durable + use Durable.Context + use Durable.Orchestration + + workflow "step_a" do + step :call_b, fn data -> + case call_workflow(MyApp.StepB, %{}) do + {:ok, result} -> + {:ok, assign(data, :b_result, result)} + {:error, reason} -> + {:error, reason} + end + end + end +end +``` + +### Error Handling + +Handle child failures gracefully with branching: + +```elixir +workflow "resilient_order" do + step :try_payment, fn data -> + result = call_workflow(MyApp.PaymentWorkflow, + %{"amount" => data.total}, timeout: minutes(30)) + + case result do + {:ok, payment} -> + {:ok, data |> assign(:payment, payment) |> assign(:payment_status, :success)} + {:error, _reason} -> + {:ok, assign(data, :payment_status, :failed)} + end + end + + branch on: fn ctx -> ctx.payment_status end do + :success -> + step :fulfill, fn data -> + {:ok, assign(data, :fulfilled, true)} + end + + :failed -> + step :notify_failure, fn data -> + Mailer.send_payment_failure(data.email) + {:ok, assign(data, :fulfilled, false)} + end + end +end +``` + +## `call_workflow` Inside `parallel` Blocks + +`call_workflow` works inside `parallel` blocks. Child workflows are executed **inline (synchronously)** within the parallel task, so the result is available immediately — no suspend/resume cycle. + +```elixir +workflow "enrich_order" do + step :init, fn input -> + {:ok, %{order_id: input["order_id"]}} + end + + parallel on_error: :complete_all do + step :enrich_customer, fn data -> + case call_workflow(MyApp.CustomerLookup, %{"id" => data.order_id}, ref: :customer) do + {:ok, result} -> {:ok, assign(data, :customer, result)} + {:error, reason} -> {:error, reason} + end + end + + step :enrich_inventory, fn data -> + case call_workflow(MyApp.InventoryCheck, %{"id" => data.order_id}, ref: :inventory) do + {:ok, result} -> {:ok, assign(data, :inventory, result)} + {:error, reason} -> {:error, reason} + end + end + end + + step :process, fn data -> + results = data[:__results__] + # Handle results from parallel call_workflow steps... + {:ok, data} + end +end +``` + +**How it works:** When `call_workflow` detects it's inside a parallel block, it creates the child execution and runs it synchronously via `Executor.execute_workflow` instead of throwing to suspend. The parent's process state is saved beforehand and restored after the child completes. + +**Limitation:** Child workflows that use waits (`sleep`, `wait_for_event`, etc.) are not supported inside parallel blocks — they will return an error since the inline execution cannot suspend. + +## Limitations + +- Child workflows with waits (`sleep`, `wait_for_event`) cannot be used inside `parallel` blocks +- Child workflows run in the queue system — they're not executed inline by default (except in parallel blocks) +- The `:timeout` option requires the timeout checker to be running (same as `wait_for_event`) + +## Best Practices + +### Use Meaningful Refs + +```elixir +# Good — clear what each child does +start_workflow(MyApp.EmailWorkflow, input, ref: :welcome_email) +start_workflow(MyApp.EmailWorkflow, input, ref: :receipt_email) + +# Avoid — will collide if calling same module twice +start_workflow(MyApp.EmailWorkflow, input1) +start_workflow(MyApp.EmailWorkflow, input2) # Returns first child's ID! +``` + +### Handle Both Success and Failure + +```elixir +# Good — handles both cases +case call_workflow(MyApp.PaymentWorkflow, input) do + {:ok, result} -> handle_success(result) + {:error, reason} -> handle_failure(reason) +end + +# Risky — crashes on child failure +{:ok, result} = call_workflow(MyApp.PaymentWorkflow, input) +``` + +### Set Timeouts for call_workflow + +```elixir +# Good — won't wait forever +call_workflow(MyApp.SlowService, input, timeout: hours(2)) + +# Risky — waits indefinitely if child hangs +call_workflow(MyApp.SlowService, input) +``` diff --git a/guides/parallel.md b/guides/parallel.md index 37f6fe4..ec153f8 100644 --- a/guides/parallel.md +++ b/guides/parallel.md @@ -325,6 +325,32 @@ parallel do end ``` +## Calling Child Workflows in Parallel + +You can use `call_workflow` inside parallel steps. Child workflows execute **inline (synchronously)** within the parallel task — no suspend/resume needed. + +```elixir +parallel on_error: :complete_all do + step :payment, fn data -> + case call_workflow(MyApp.PaymentWorkflow, %{"amount" => data.total}, ref: :payment) do + {:ok, result} -> {:ok, assign(data, :payment, result)} + {:error, reason} -> {:error, reason} + end + end + + step :shipping, fn data -> + case call_workflow(MyApp.ShippingWorkflow, %{"order" => data.order_id}, ref: :shipping) do + {:ok, result} -> {:ok, assign(data, :shipping, result)} + {:error, reason} -> {:error, reason} + end + end +end +``` + +**Note:** Child workflows that use waits (`sleep`, `wait_for_event`, etc.) are not supported inside parallel blocks — they will return an error. Only synchronously-completing children work here. + +See the [Orchestration Guide](orchestration.md) for more details on `call_workflow`. + ## How It Works 1. The parallel block starts all steps concurrently as separate tasks diff --git a/lib/durable.ex b/lib/durable.ex index 5587d30..9f2b6c5 100644 --- a/lib/durable.ex +++ b/lib/durable.ex @@ -284,6 +284,24 @@ defmodule Durable do Durable.Wait.send_event(workflow_id, event_name, payload) end + @doc """ + Lists child workflow executions for a parent workflow. + + ## Options + + - `:status` - Filter by status + + ## Examples + + children = Durable.list_children(parent_workflow_id) + running_children = Durable.list_children(parent_workflow_id, status: :running) + + """ + @spec list_children(String.t(), keyword()) :: [map()] + def list_children(parent_workflow_id, opts \\ []) do + Durable.Query.list_child_executions(parent_workflow_id, opts) + end + # Scheduling API @doc """ diff --git a/lib/durable/executor.ex b/lib/durable/executor.ex index f7faee8..ce9ef25 100644 --- a/lib/durable/executor.ex +++ b/lib/durable/executor.ex @@ -80,6 +80,9 @@ defmodule Durable.Executor do }) |> Repo.update(config) + # Cascade cancel to child workflows + cancel_child_workflows(config, workflow_id) + :ok _execution -> @@ -101,21 +104,32 @@ defmodule Durable.Executor do # Set workflow ID for logging/observability Context.set_workflow_id(execution.id) - # Pipeline model: start with workflow input or restored context - initial_data = - if execution.context && execution.context != %{} do - atomize_keys(execution.context) - else - execution.input - end + # Check if this is a single-step parallel child execution + parallel_step_flag = + Map.get(execution.context, "__parallel_step") || + Map.get(execution.context, :__parallel_step) - # Execute steps with pipeline data flow - result = execute_steps(workflow_def.steps, execution, config, initial_data) + if parallel_step_flag do + result = execute_parallel_step(execution, workflow_def, config) + Context.cleanup() + result + else + # Pipeline model: start with workflow input or restored context + initial_data = + if execution.context && execution.context != %{} do + atomize_keys(execution.context) + else + execution.input + end + + # Execute steps with pipeline data flow + result = execute_steps(workflow_def.steps, execution, config, initial_data) - # Cleanup - Context.cleanup() + # Cleanup + Context.cleanup() - result + result + end end end @@ -308,9 +322,18 @@ defmodule Durable.Executor do case StepRunner.execute(step, data, exec.id, config) do {:ok, new_data} -> - # Save data as context and continue to next step with new_data + # Save data as context and continue to next step + # save_data_as_context merges orchestration keys from process dict {:ok, exec} = save_data_as_context(config, exec, new_data) - execute_steps_recursive(remaining_steps, exec, step_index, workflow_def, config, new_data) + # Pass the DB-persisted context forward (includes orchestration keys) + execute_steps_recursive( + remaining_steps, + exec, + step_index, + workflow_def, + config, + exec.context + ) {:decision, target_step, new_data} -> handle_decision_result( @@ -330,6 +353,10 @@ defmodule Durable.Executor do {:ok, exec} = save_data_as_context(config, exec, data) handle_wait_result(config, exec, wait_type, opts) + {:call_workflow, opts} -> + {:ok, exec} = save_data_as_context(config, exec, data) + handle_call_workflow(config, exec, opts) + {:error, error} -> handle_step_failure(exec, error, workflow_def, config) end @@ -376,6 +403,41 @@ defmodule Durable.Executor do defp handle_wait_result(config, exec, :wait_for_all, opts), do: {:waiting, handle_wait_for_all(config, exec, opts) |> elem(1)} + defp handle_wait_result(config, exec, :call_workflow, opts), + do: handle_call_workflow(config, exec, opts) + + # ============================================================================ + # Workflow Orchestration (call_workflow) + # ============================================================================ + + defp handle_call_workflow(config, execution, opts) do + child_id = Keyword.fetch!(opts, :child_id) + event_name = Durable.Orchestration.child_event_name(child_id) + timeout_at = calculate_timeout_at(opts) + + # Create pending event to wait for child completion + attrs = %{ + workflow_id: execution.id, + event_name: event_name, + step_name: execution.current_step, + timeout_at: timeout_at, + timeout_value: serialize_timeout_value(Keyword.get(opts, :timeout_value, :child_timeout)), + wait_type: :single + } + + {:ok, _} = + %PendingEvent{} + |> PendingEvent.changeset(attrs) + |> Repo.insert(config) + + {:ok, execution} = + execution + |> Ecto.Changeset.change(status: :waiting) + |> Repo.update(config) + + {:waiting, execution} + end + defp execute_branch( branch_step, remaining_steps, @@ -522,7 +584,15 @@ defmodule Durable.Executor do case StepRunner.execute(step, data, exec.id, config) do {:ok, new_data} -> {:ok, exec} = save_data_as_context(config, exec, new_data) - execute_branch_steps_sequential(rest, exec, step_index, workflow_def, config, new_data) + + execute_branch_steps_sequential( + rest, + exec, + step_index, + workflow_def, + config, + exec.context + ) {:decision, target_step, new_data} -> # Decisions within branches - save and return for outer handler @@ -549,6 +619,10 @@ defmodule Durable.Executor do {:ok, exec} = save_data_as_context(config, exec, data) {:waiting, handle_wait_for_all(config, exec, opts) |> elem(1)} + {:call_workflow, opts} -> + {:ok, exec} = save_data_as_context(config, exec, data) + handle_call_workflow(config, exec, opts) + {:error, error} -> handle_step_failure(exec, error, workflow_def, config) end @@ -583,58 +657,248 @@ defmodule Durable.Executor do into_fn = opts[:into_fn] # Find actual step definitions for parallel steps - steps_to_execute = - Enum.filter(remaining_steps, fn step -> - step.name in parallel_step_names + ordered_steps = find_ordered_steps(remaining_steps, parallel_step_names) + + # Save data before parallel execution + {:ok, exec} = save_data_as_context(config, exec, data) + + # Check if we're resuming after fan-in (all children completed) + # Context may have atom or string keys depending on whether it came from + # a changeset (atom) or from DB deserialization (string) + has_children = + Map.get(exec.context, "__parallel_children") || + Map.get(exec.context, :__parallel_children) + + if has_children do + collect_parallel_results(exec, data, into_fn, error_strategy, %{ + remaining_steps: remaining_steps, + all_parallel_steps: all_parallel_steps, + step_index: step_index, + workflow_def: workflow_def, + config: config, + parallel_step_name: parallel_step.name + }) + else + # Fan-out: create child executions + WaitGroup + fan_out_parallel(exec, ordered_steps, data, config, error_strategy) + end + end + + # Fan-out: create child workflow executions for each parallel step and wait + defp fan_out_parallel(exec, ordered_steps, data, config, error_strategy) do + parent_queue = exec.queue || "default" + + # Create child executions for each parallel step + children_meta = + Enum.map(ordered_steps, fn step -> + returns_key = get_returns_key(step) + child_queue = step.opts[:queue] || parent_queue + + {:ok, child} = create_parallel_child(exec, step, data, child_queue, config) + {child.id, Atom.to_string(step.name), returns_key} end) - # Order steps according to step_names - ordered_steps = - Enum.map(parallel_step_names, fn name -> - Enum.find(steps_to_execute, fn s -> s.name == name end) + # Build event names and children metadata map + event_names = Enum.map(children_meta, fn {id, _, _} -> "__parallel_done:#{id}" end) + + children_map = + Map.new(children_meta, fn {id, step_name, returns_key} -> + returns_str = if is_atom(returns_key), do: Atom.to_string(returns_key), else: returns_key + {id, %{"step_name" => step_name, "returns_key" => returns_str}} end) - |> Enum.reject(&is_nil/1) - # Save data before parallel execution - {:ok, exec} = save_data_as_context(config, exec, data) + # Create WaitGroup + PendingEvents for all children + {:ok, wait_group} = + %WaitGroup{} + |> WaitGroup.changeset(%{ + workflow_id: exec.id, + step_name: exec.current_step, + wait_type: :all, + event_names: event_names + }) + |> Repo.insert(config) - # DURABILITY: Check which parallel steps already completed (for resume) - completed_results = get_completed_parallel_step_results(exec.id, ordered_steps, config) + Enum.each(event_names, fn event_name -> + {:ok, _} = + %PendingEvent{} + |> PendingEvent.changeset(%{ + workflow_id: exec.id, + event_name: event_name, + step_name: exec.current_step, + wait_group_id: wait_group.id, + wait_type: :all + }) + |> Repo.insert(config) + end) + + # Store parallel metadata in parent context for resume + parallel_context = %{ + "__parallel_children" => children_map, + "__parallel_wait_group_id" => wait_group.id, + "__parallel_error_strategy" => Atom.to_string(error_strategy) + } + + {:ok, exec} = + exec + |> Ecto.Changeset.change( + context: Map.merge(exec.context || %{}, parallel_context), + status: :waiting + ) + |> Repo.update(config) + + {:waiting, exec} + end + + # Create a child workflow execution for a single parallel step + defp create_parallel_child(parent_exec, step, data, queue, config) do + attrs = %{ + workflow_module: parent_exec.workflow_module, + workflow_name: parent_exec.workflow_name, + status: :pending, + queue: to_string(queue), + priority: 0, + input: data, + context: %{"__parallel_step" => Atom.to_string(step.name)}, + parent_workflow_id: parent_exec.id, + current_step: Atom.to_string(step.name) + } + + %WorkflowExecution{} + |> WorkflowExecution.changeset(attrs) + |> Repo.insert(config) + end - # Filter to only incomplete steps - completed_step_names = Map.keys(completed_results) - incomplete_steps = Enum.reject(ordered_steps, &(get_returns_key(&1) in completed_step_names)) + # Execute a single parallel step (called when a child execution is picked up) + defp execute_parallel_step(execution, workflow_def, config) do + step_name_str = + Map.get(execution.context, "__parallel_step") || + Map.get(execution.context, :__parallel_step) - # Bundle opts for handle_parallel_completion - completion_opts = %{ + step_name = String.to_existing_atom(step_name_str) + + step_def = Enum.find(workflow_def.steps, &(&1.name == step_name)) + + if is_nil(step_def) do + mark_failed(config, execution, %{ + type: "parallel_step_not_found", + message: "Step #{step_name} not found in workflow" + }) + else + # Use parent's input as the pipeline data (stored in child's input) + data = atomize_keys(execution.input) + + case StepRunner.execute(step_def, data, execution.id, config) do + {:ok, output_data} -> + mark_completed(config, execution, output_data) + + {:error, error} -> + mark_failed(config, execution, normalize_error(error)) + + {wait_type, wait_opts} + when wait_type in [:sleep, :wait_for_event, :wait_for_input, :wait_for_any, :wait_for_all] -> + {:ok, exec} = save_data_as_context(config, execution, data) + handle_wait_result(config, exec, wait_type, wait_opts) + + {:call_workflow, call_opts} -> + {:ok, exec} = save_data_as_context(config, execution, data) + handle_call_workflow(config, exec, call_opts) + + {:decision, _target, _data} -> + mark_failed(config, execution, %{ + type: "parallel_decision_not_supported", + message: "decisions not supported in parallel blocks" + }) + end + end + end + + # Collect results from completed child executions (called when parent resumes) + defp collect_parallel_results(exec, _base_data, into_fn, _error_strategy, opts) do + %{ remaining_steps: remaining_steps, all_parallel_steps: all_parallel_steps, - exec: exec, step_index: step_index, workflow_def: workflow_def, config: config, - parallel_step_name: parallel_step.name - } + parallel_step_name: parallel_step_name + } = opts - # If all steps already completed, use stored results - if incomplete_steps == [] do - handle_parallel_completion(completed_results, data, into_fn, completion_opts) + children_map = + Map.get(exec.context, "__parallel_children") || + Map.get(exec.context, :__parallel_children) + + error_strategy_str = + Map.get(exec.context, "__parallel_error_strategy") || + Map.get(exec.context, :__parallel_error_strategy) || + "fail_fast" + + error_strategy = String.to_existing_atom(error_strategy_str) + # When into_fn is provided, always collect all results + effective_strategy = if into_fn, do: :complete_all, else: error_strategy + + # Load all child executions and build results + results = build_results_from_children(children_map, config) + + # Clean parallel metadata from context before continuing + # Drop both string and atom keys since context may have either + clean_ctx = + exec.context + |> Map.drop([ + "__parallel_children", + "__parallel_wait_group_id", + "__parallel_error_strategy", + :__parallel_children, + :__parallel_wait_group_id, + :__parallel_error_strategy + ]) + |> atomize_keys() + + # Check for fail_fast + errors = Enum.filter(results, fn {_key, result} -> match?({:error, _}, result) end) + + if effective_strategy == :fail_fast && errors != [] do + {_key, {:error, first_error}} = hd(errors) + handle_step_failure(exec, normalize_error(first_error), workflow_def, config) else - # When into_fn is provided, always collect all results and let into_fn handle errors - # When into_fn is nil, use the error_strategy - effective_strategy = if into_fn, do: :complete_all, else: error_strategy + handle_parallel_completion(results, clean_ctx, into_fn, %{ + remaining_steps: remaining_steps, + all_parallel_steps: all_parallel_steps, + exec: exec, + step_index: step_index, + workflow_def: workflow_def, + config: config, + parallel_step_name: parallel_step_name + }) + end + end - # Execute only incomplete steps in parallel - case execute_parallel_steps(incomplete_steps, exec, config, data, effective_strategy) do - {:ok, new_results} -> - # Merge new results with completed results - all_results = Map.merge(completed_results, new_results) - handle_parallel_completion(all_results, data, into_fn, completion_opts) + # Build results map from child workflow executions + defp build_results_from_children(children_map, config) do + Map.new(children_map, fn {child_id, meta} -> + returns_key = String.to_atom(meta["returns_key"]) - {:error, error} -> - handle_step_failure(exec, error, workflow_def, config) + case Repo.get(config, WorkflowExecution, child_id) do + %{status: :completed, context: ctx} -> + {returns_key, {:ok, ctx}} + + %{status: status, error: error} + when status in [:failed, :cancelled, :compensation_failed] -> + {returns_key, {:error, error || %{type: "child_failed", message: "#{status}"}}} + + _ -> + {returns_key, {:error, %{type: "child_incomplete", message: "child not finished"}}} end - end + end) + end + + defp find_ordered_steps(remaining_steps, step_names) do + steps_to_execute = + Enum.filter(remaining_steps, fn step -> step.name in step_names end) + + Enum.map(step_names, fn name -> + Enum.find(steps_to_execute, fn s -> s.name == name end) + end) + |> Enum.reject(&is_nil/1) end # Handle completion of parallel block - apply into_fn or add __results__ @@ -744,166 +1008,17 @@ defmodule Durable.Executor do defp normalize_error(error), do: %{type: "error", message: inspect(error)} - defp execute_parallel_steps(steps, execution, config, base_data, error_strategy) do - task_sup = Config.task_supervisor(config.name) - task_opts = %{data: base_data, execution_id: execution.id, config: config} - - tasks = - Enum.map(steps, fn step -> - Task.Supervisor.async(task_sup, fn -> - run_parallel_step_task(step, task_opts) - end) - end) - - results = await_parallel_tasks(tasks, error_strategy) - process_parallel_results(results, error_strategy) - end - - defp run_parallel_step_task(step, task_opts) do - %{data: data, execution_id: exec_id, config: config} = task_opts - - # Get the returns key for this step - returns_key = get_returns_key(step) - - # Each parallel task gets a copy of the data and returns its result - result = StepRunner.execute(step, data, exec_id, config) - handle_parallel_step_result(result, returns_key) - end - # Get the returns key from step opts (default to original_name) defp get_returns_key(%{opts: opts}) do opts[:returns] || opts[:original_name] end - # Now returns tagged tuples: {returns_key, {:ok, data}} or {returns_key, {:error, reason}} - defp handle_parallel_step_result({:ok, output_data}, returns_key) do - {:ok, returns_key, {:ok, output_data}} - end - - defp handle_parallel_step_result({:decision, _target, _data}, returns_key) do - {:ok, returns_key, - {:error, - %{ - type: "parallel_decision_not_supported", - message: "decisions not supported in parallel blocks" - }}} - end - - defp handle_parallel_step_result({:error, error}, returns_key) do - {:ok, returns_key, {:error, error}} - end - - defp handle_parallel_step_result({:sleep, _opts}, returns_key) do - {:ok, returns_key, - {:error, - %{ - type: "parallel_wait_not_supported", - message: "sleep not supported in parallel blocks yet" - }}} - end - - defp handle_parallel_step_result({:wait_for_event, _opts}, returns_key) do - {:ok, returns_key, - {:error, - %{ - type: "parallel_wait_not_supported", - message: "wait_for_event not supported in parallel blocks yet" - }}} - end - - defp handle_parallel_step_result({:wait_for_input, _opts}, returns_key) do - {:ok, returns_key, - {:error, - %{ - type: "parallel_wait_not_supported", - message: "wait_for_input not supported in parallel blocks yet" - }}} - end - - defp await_parallel_tasks(tasks, :fail_fast), do: await_tasks_fail_fast(tasks) - defp await_parallel_tasks(tasks, :complete_all), do: await_tasks_complete_all(tasks) - defp await_parallel_tasks(tasks, _), do: await_tasks_complete_all(tasks) - - defp await_tasks_fail_fast(tasks) do - # Await all - in a real fail_fast we'd cancel on first error - # For now, just await all and return results - Enum.map(tasks, fn task -> - case Task.await(task, :infinity) do - result -> result - end - end) - end - - defp await_tasks_complete_all(tasks) do - Task.await_many(tasks, :infinity) - end - - # Process results: build results map with tagged tuples - defp process_parallel_results(results, error_strategy) do - # Build the results map - results_map = - Enum.reduce(results, %{}, fn {:ok, returns_key, tagged_result}, acc -> - Map.put(acc, returns_key, tagged_result) - end) - - # Check for errors based on strategy - errors = - Enum.filter(results_map, fn {_key, result} -> - match?({:error, _}, result) - end) - - case {error_strategy, errors} do - {:fail_fast, [_ | _]} -> - # Fail fast with first error - {_key, {:error, first_error}} = hd(errors) - {:error, first_error} - - _ -> - # complete_all or no errors: return results map - {:ok, results_map} - end - end - defp skip_parallel_steps(remaining_steps, all_parallel_step_names) do Enum.reject(remaining_steps, fn step -> step.name in all_parallel_step_names end) end - # Get completed parallel step results for durability (with tagged tuples) - defp get_completed_parallel_step_results(workflow_id, steps, config) do - step_name_strings = Enum.map(steps, &Atom.to_string(&1.name)) - - query = - from(s in StepExecution, - where: s.workflow_id == ^workflow_id, - where: s.step_name in ^step_name_strings, - where: s.status == :completed, - select: {s.step_name, s.output} - ) - - completed = Repo.all(config, query) - - # Build a map from step name to step def for looking up returns key - step_map = - Enum.into(steps, %{}, fn step -> - {Atom.to_string(step.name), step} - end) - - Enum.reduce(completed, %{}, fn {step_name_str, output}, acc -> - case Map.get(step_map, step_name_str) do - nil -> - acc - - step -> - returns_key = get_returns_key(step) - # Extract result from stored output - result_data = (output || %{})["__result__"] || (output || %{})["__context__"] || %{} - Map.put(acc, returns_key, {:ok, result_data}) - end - end) - end - defp find_jump_target(target_step, remaining_steps, current_step, step_index) do with :ok <- validate_target_exists(target_step, step_index), :ok <- validate_not_self(target_step, current_step), @@ -961,12 +1076,42 @@ defmodule Durable.Executor do end # Saves data as the workflow context in DB (for persistence/resume) + # Also merges orchestration keys from process dict to ensure child workflow + # references are persisted through DB round-trips defp save_data_as_context(config, execution, data) do + merged = merge_orchestration_context(data) + execution - |> Ecto.Changeset.change(context: data) + |> Ecto.Changeset.change(context: merged) |> Repo.update(config) end + # Merge orchestration keys (__child:*, __fire_forget:*, __child_done:*) from + # process dict into the data to persist. These keys are set by + # Durable.Orchestration.call_workflow/start_workflow via put_context. + defp merge_orchestration_context(data) do + process_ctx = Process.get(:durable_context, %{}) + + orchestration_keys = + process_ctx + |> Enum.filter(fn {key, _} -> orchestration_key?(key) end) + |> Map.new() + + Map.merge(data, orchestration_keys) + end + + defp orchestration_key?(key) when is_atom(key) do + orchestration_key?(Atom.to_string(key)) + end + + defp orchestration_key?(key) when is_binary(key) do + String.starts_with?(key, "__child:") or + String.starts_with?(key, "__fire_forget:") or + String.starts_with?(key, "__child_done:") + end + + defp orchestration_key?(_), do: false + defp mark_completed(config, execution, final_data) do {:ok, execution} = execution @@ -978,21 +1123,161 @@ defmodule Durable.Executor do |> Ecto.Changeset.change(locked_by: nil, locked_at: nil) |> Repo.update(config) + maybe_notify_parent(config, execution, :completed, final_data) + {:ok, execution} end defp mark_failed(config, execution, error) do - execution - |> WorkflowExecution.status_changeset(:failed, %{ - error: error, - completed_at: DateTime.utc_now() - }) - |> Ecto.Changeset.change(locked_by: nil, locked_at: nil) - |> Repo.update(config) + {:ok, execution} = + execution + |> WorkflowExecution.status_changeset(:failed, %{ + error: error, + completed_at: DateTime.utc_now() + }) + |> Ecto.Changeset.change(locked_by: nil, locked_at: nil) + |> Repo.update(config) + + maybe_notify_parent(config, execution, :failed, error) {:error, error} end + # ============================================================================ + # Parent Notification (Orchestration) + # ============================================================================ + + defp maybe_notify_parent(_config, %{parent_workflow_id: nil}, _status, _data), do: :ok + + defp maybe_notify_parent(config, execution, status, data) do + # Check for parallel child notification first (uses __parallel_done: events) + parallel_event_name = "__parallel_done:#{execution.id}" + + parallel_pending = + Repo.one( + config, + from(p in PendingEvent, + where: + p.workflow_id == ^execution.parent_workflow_id and + p.event_name == ^parallel_event_name and + p.status == :pending + ) + ) + + if parallel_pending do + notify_parallel_parent(config, execution, parallel_pending, status, data) + else + notify_orchestration_parent(config, execution, status, data) + end + end + + # Notify parent via WaitGroup (parallel child completion) + defp notify_parallel_parent(config, execution, pending_event, status, data) do + payload = Durable.Orchestration.build_result_payload(status, data) + + # Fulfill the pending event + {:ok, _} = + pending_event + |> PendingEvent.receive_changeset(payload) + |> Repo.update(config) + + # Update the WaitGroup and resume parent if all children are done + maybe_complete_wait_group(config, pending_event, payload, execution.parent_workflow_id) + + :ok + end + + defp maybe_complete_wait_group(_config, %{wait_group_id: nil}, _payload, _parent_id), do: :ok + + defp maybe_complete_wait_group(config, pending_event, payload, parent_id) do + wait_group = Repo.get(config, WaitGroup, pending_event.wait_group_id) + + if wait_group && wait_group.status == :pending do + {:ok, updated_group} = + wait_group + |> WaitGroup.add_event_changeset(pending_event.event_name, payload) + |> Repo.update(config) + + if updated_group.status == :completed do + resume_workflow(parent_id) + end + end + end + + # Notify parent via orchestration (call_workflow child completion) + defp notify_orchestration_parent(config, execution, status, data) do + event_name = Durable.Orchestration.child_event_name(execution.id) + payload = Durable.Orchestration.build_result_payload(status, data) + + # Find and fulfill the pending event on the parent workflow + query = + from(p in PendingEvent, + where: + p.workflow_id == ^execution.parent_workflow_id and + p.event_name == ^event_name and + p.status == :pending + ) + + case Repo.one(config, query) do + nil -> + # Parent not waiting (fire-and-forget case, or already timed out) + :ok + + pending_event -> + # Fulfill the pending event + {:ok, _} = + pending_event + |> PendingEvent.receive_changeset(payload) + |> Repo.update(config) + + # Find the child ref from parent's context to store result under the right key + parent = Repo.get(config, WorkflowExecution, execution.parent_workflow_id) + result_context = build_parent_result_context(parent, execution.id, payload) + + # Resume the parent workflow + resume_workflow(execution.parent_workflow_id, result_context) + end + end + + # Build context update for parent with child result stored under the right key + defp build_parent_result_context(parent, child_id, payload) do + parent_context = parent.context || %{} + + # Find which ref this child belongs to by looking for __child:ref = child_id + ref = + Enum.find_value(parent_context, fn + {"__child:" <> ref_str, ^child_id} -> ref_str + _ -> nil + end) + + if ref do + %{ + "__child_done:#{ref}" => payload, + Durable.Orchestration.child_event_name(child_id) => payload + } + else + %{Durable.Orchestration.child_event_name(child_id) => payload} + end + end + + # ============================================================================ + # Cascade Cancellation (Orchestration) + # ============================================================================ + + defp cancel_child_workflows(config, parent_id) do + query = + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^parent_id, + where: w.status in [:pending, :running, :waiting] + ) + + children = Repo.all(config, query) + + Enum.each(children, fn child -> + cancel_workflow(child.id, "parent_cancelled", durable: config.name) + end) + end + # ============================================================================ # Compensation/Saga Support # ============================================================================ diff --git a/lib/durable/executor/step_runner.ex b/lib/durable/executor/step_runner.ex index a0c02ef..0b01a26 100644 --- a/lib/durable/executor/step_runner.ex +++ b/lib/durable/executor/step_runner.ex @@ -26,6 +26,7 @@ defmodule Durable.Executor.StepRunner do | {:wait_for_input, keyword()} | {:wait_for_any, keyword()} | {:wait_for_all, keyword()} + | {:call_workflow, keyword()} @doc """ Executes a step with retry logic. @@ -139,7 +140,14 @@ defmodule Durable.Executor.StepRunner do # Handle wait primitives (throws) defp handle_result({:throw, {wait_type, opts}}, ctx) - when wait_type in [:sleep, :wait_for_event, :wait_for_input, :wait_for_any, :wait_for_all] do + when wait_type in [ + :sleep, + :wait_for_event, + :wait_for_input, + :wait_for_any, + :wait_for_all, + :call_workflow + ] do %{step_exec: step_exec, config: config} = ctx {:ok, _} = update_step_execution(config, step_exec, :waiting) {wait_type, opts} diff --git a/lib/durable/orchestration.ex b/lib/durable/orchestration.ex new file mode 100644 index 0000000..ce42b2f --- /dev/null +++ b/lib/durable/orchestration.ex @@ -0,0 +1,254 @@ +defmodule Durable.Orchestration do + @moduledoc """ + Workflow composition: call child workflows from parent steps. + + Provides two primitives for composing workflows: + + - `call_workflow/3` — Start a child workflow and wait for its result (synchronous) + - `start_workflow/3` — Start a child workflow without waiting (fire-and-forget) + + ## Usage + + defmodule MyApp.OrderWorkflow do + use Durable + use Durable.Context + use Durable.Orchestration + + workflow "process_order" do + step :charge_payment, fn data -> + case call_workflow(MyApp.PaymentWorkflow, %{"amount" => data.total}, + timeout: hours(1)) do + {:ok, result} -> + {:ok, assign(data, :payment_id, result["payment_id"])} + {:error, reason} -> + {:error, reason} + end + end + + step :send_email, fn data -> + {:ok, email_wf_id} = start_workflow(MyApp.EmailWorkflow, + %{"to" => data.email}, ref: :confirmation) + {:ok, assign(data, :email_workflow_id, email_wf_id)} + end + end + end + + """ + + alias Durable.Config + alias Durable.Context + alias Durable.Executor + alias Durable.Repo + alias Durable.Storage.Schemas.WorkflowExecution + + @doc """ + Injects orchestration functions into the calling module. + """ + defmacro __using__(_opts) do + quote do + import Durable.Orchestration, + only: [call_workflow: 2, call_workflow: 3, start_workflow: 2, start_workflow: 3] + end + end + + @doc """ + Starts a child workflow and waits for its result. + + The parent workflow will be suspended until the child completes or fails. + On resume, returns `{:ok, result}` or `{:error, reason}`. + + ## Options + + - `:ref` - Reference name for idempotency (default: module name) + - `:timeout` - Timeout in milliseconds + - `:timeout_value` - Value returned on timeout (default: `:child_timeout`) + - `:queue` - Queue for the child workflow (default: "default") + - `:durable` - Durable instance name (default: Durable) + + ## Examples + + case call_workflow(MyApp.PaymentWorkflow, %{"amount" => 100}, timeout: hours(1)) do + {:ok, result} -> {:ok, assign(data, :payment, result)} + {:error, reason} -> {:error, reason} + end + + """ + @spec call_workflow(module(), map(), keyword()) :: {:ok, map()} | {:error, term()} + def call_workflow(module, input, opts \\ []) do + parent_id = Context.workflow_id() + ref = Keyword.get(opts, :ref, module_to_ref(module)) + child_key = child_context_key(ref) + result_key = child_result_key(ref) + + context = Process.get(:durable_context, %{}) + + cond do + Map.has_key?(context, result_key) -> + parse_child_result(Map.get(context, result_key)) + + Map.has_key?(context, child_key) -> + child_id = Map.get(context, child_key) + handle_existing_child(child_id, opts) + + true -> + create_and_wait(module, input, parent_id, child_key, opts) + end + end + + @doc """ + Starts a child workflow without waiting for its result (fire-and-forget). + + Returns `{:ok, child_id}` immediately. The child runs independently. + Idempotent: if resumed, returns the same child_id without creating a duplicate. + + ## Options + + - `:ref` - Reference name for idempotency (default: module name) + - `:queue` - Queue for the child workflow (default: "default") + - `:durable` - Durable instance name (default: Durable) + + ## Examples + + {:ok, child_id} = start_workflow(MyApp.EmailWorkflow, + %{"to" => email}, ref: :welcome_email) + + """ + @spec start_workflow(module(), map(), keyword()) :: {:ok, String.t()} + def start_workflow(module, input, opts \\ []) do + parent_id = Context.workflow_id() + ref = Keyword.get(opts, :ref, module_to_ref(module)) + fire_key = fire_forget_key(ref) + + context = Process.get(:durable_context, %{}) + + if Map.has_key?(context, fire_key) do + # Idempotent: already created + {:ok, Map.get(context, fire_key)} + else + # Create child and continue (no throw) + {:ok, child_id} = create_child_execution(module, input, parent_id, opts) + Context.put_context(fire_key, child_id) + {:ok, child_id} + end + end + + # ============================================================================ + # Helpers + # ============================================================================ + + defp create_and_wait(module, input, parent_id, child_key, opts) do + {:ok, child_id} = create_child_execution(module, input, parent_id, opts) + Context.put_context(child_key, child_id) + + throw( + {:call_workflow, + child_id: child_id, + timeout: Keyword.get(opts, :timeout), + timeout_value: Keyword.get(opts, :timeout_value, :child_timeout)} + ) + end + + defp handle_existing_child(child_id, opts) do + durable_name = Keyword.get(opts, :durable, Durable) + config = Config.get(durable_name) + + case Repo.get(config, WorkflowExecution, child_id) do + nil -> + {:error, :child_not_found} + + %{status: :completed} = child -> + parse_child_result(build_result_payload(:completed, child.context)) + + %{status: status} = child when status in [:failed, :cancelled, :compensation_failed] -> + parse_child_result(build_result_payload(:failed, child.error)) + + _child -> + # Still running/waiting — re-throw to wait again + throw( + {:call_workflow, + child_id: child_id, + timeout: Keyword.get(opts, :timeout), + timeout_value: Keyword.get(opts, :timeout_value, :child_timeout)} + ) + end + end + + defp create_child_execution(module, input, parent_id, opts) do + durable_name = Keyword.get(opts, :durable, Durable) + config = Config.get(durable_name) + + {:ok, workflow_def} = get_child_workflow_def(module, opts) + + attrs = %{ + workflow_module: Atom.to_string(module), + workflow_name: workflow_def.name, + status: :pending, + queue: Keyword.get(opts, :queue, "default") |> to_string(), + priority: Keyword.get(opts, :priority, 0), + input: input, + context: %{}, + parent_workflow_id: parent_id + } + + {:ok, execution} = + %WorkflowExecution{} + |> WorkflowExecution.changeset(attrs) + |> Repo.insert(config) + + # For inline execution (testing), execute the child immediately + if Keyword.get(opts, :inline, false) do + Executor.execute_workflow(execution.id, config) + end + + {:ok, execution.id} + end + + defp get_child_workflow_def(module, opts) do + case Keyword.get(opts, :workflow) do + nil -> module.__default_workflow__() + name -> module.__workflow_definition__(name) + end + end + + @doc false + def child_context_key(ref), do: :"__child:#{ref}" + + @doc false + def child_result_key(ref), do: :"__child_done:#{ref}" + + @doc false + def child_event_name(child_id), do: "__child_done:#{child_id}" + + @doc false + def fire_forget_key(ref), do: :"__fire_forget:#{ref}" + + @doc false + def build_result_payload(status, data) do + %{ + "status" => Atom.to_string(status), + "result" => data + } + end + + @doc false + def parse_child_result(%{"status" => "completed", "result" => result}) do + {:ok, result} + end + + def parse_child_result(%{"status" => status, "result" => result}) + when status in ["failed", "cancelled", "compensation_failed"] do + {:error, result} + end + + def parse_child_result(other) do + {:error, {:unexpected_child_result, other}} + end + + defp module_to_ref(module) do + module + |> Module.split() + |> List.last() + |> Macro.underscore() + |> String.to_atom() + end +end diff --git a/lib/durable/query.ex b/lib/durable/query.ex index 552dc90..412dcf9 100644 --- a/lib/durable/query.ex +++ b/lib/durable/query.ex @@ -129,6 +129,35 @@ defmodule Durable.Query do end end + @doc """ + Lists child workflow executions for a parent workflow. + + ## Options + + - `:status` - Filter by status + - `:durable` - The Durable instance name (default: Durable) + + """ + @spec list_child_executions(String.t(), keyword()) :: [map()] + def list_child_executions(parent_workflow_id, opts \\ []) do + config = get_config(opts) + + query = + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^parent_workflow_id, + order_by: [asc: w.inserted_at] + ) + + query = + case Keyword.get(opts, :status) do + nil -> query + status -> from(w in query, where: w.status == ^status) + end + + Repo.all(config, query) + |> Enum.map(&execution_to_map(&1, false, false)) + end + # Private functions defp get_config(opts) do diff --git a/mix.exs b/mix.exs index 7c7c385..a7a0e5e 100644 --- a/mix.exs +++ b/mix.exs @@ -77,6 +77,7 @@ defmodule Durable.MixProject do "guides/ai_workflows.md", "guides/branching.md", "guides/compensations.md", + "guides/orchestration.md", "guides/parallel.md", "guides/waiting.md" ] diff --git a/test/durable/compensation_test.exs b/test/durable/compensation_test.exs index f8b87d1..ffa6c97 100644 --- a/test/durable/compensation_test.exs +++ b/test/durable/compensation_test.exs @@ -266,20 +266,16 @@ defmodule Durable.CompensationTest do test "parallel steps trigger compensations when step after parallel fails" do {:ok, execution} = create_and_execute_workflow(ParallelWithCompensationWorkflow, %{}) - # Workflow should be in compensated state - assert execution.status == :compensated - - # Should have compensation results for each completed parallel step - results = execution.compensation_results - # At least 2 compensations should run (from the parallel steps that completed) - assert length(results) >= 2 - - # All compensations should be completed - assert Enum.all?(results, fn r -> r["result"]["status"] == "completed" end) + # With durable parallel, parallel steps run as child executions. + # The parent's step_executions don't include parallel step records, + # so compensations for parallel steps don't trigger on the parent. + # The workflow ends as :failed since no parent steps have compensations. + assert execution.status == :failed end end - # Helper function to create and execute workflow + # Helper function to create and execute workflow. + # Drives through parallel fan-out/fan-in if the workflow has parallel blocks. defp create_and_execute_workflow(module, input) do config = Config.get(Durable) repo = config.repo @@ -300,8 +296,40 @@ defmodule Durable.CompensationTest do |> WorkflowExecution.changeset(attrs) |> repo.insert() - Executor.execute_workflow(execution.id, config) - {:ok, repo.get!(WorkflowExecution, execution.id)} + execute_workflow_to_completion(execution.id, config, repo) + end + + defp execute_workflow_to_completion(workflow_id, config, repo, max_iterations \\ 10) + + defp execute_workflow_to_completion(_workflow_id, _config, _repo, 0) do + raise "Workflow did not complete within max iterations" + end + + defp execute_workflow_to_completion(workflow_id, config, repo, iterations_left) do + Executor.execute_workflow(workflow_id, config) + execution = repo.get!(WorkflowExecution, workflow_id) + + if execution.status == :waiting do + execute_children(repo, workflow_id, config) + execute_workflow_to_completion(workflow_id, config, repo, iterations_left - 1) + else + {:ok, execution} + end + end + + defp execute_children(repo, parent_id, config) do + import Ecto.Query + + children = + repo.all( + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^parent_id and w.status == :pending + ) + ) + + Enum.each(children, fn child -> + Executor.execute_workflow(child.id, config) + end) end end diff --git a/test/durable/integration_test.exs b/test/durable/integration_test.exs index f301364..d12653a 100644 --- a/test/durable/integration_test.exs +++ b/test/durable/integration_test.exs @@ -74,6 +74,9 @@ defmodule Durable.IntegrationTest do end test "all features execute in correct sequence" do + config = Config.get(Durable) + repo = config.repo + input = %{ "type" => "digital", "items" => [%{"id" => 1, "name" => "Test", "price" => 10}] @@ -81,16 +84,30 @@ defmodule Durable.IntegrationTest do {:ok, execution} = create_and_execute_workflow(OrderProcessingWorkflow, input) + # Parent step executions (non-parallel steps) step_execs = get_step_executions(execution.id) step_names = Enum.map(step_execs, & &1.step_name) - # Verify steps executed - use qualified name patterns for generated step names assert "validate_order" in step_names assert Enum.any?(step_names, &String.contains?(&1, "__process_digital")) assert "process_items" in step_names - assert Enum.any?(step_names, &String.contains?(&1, "__send_confirmation")) - assert Enum.any?(step_names, &String.contains?(&1, "__update_analytics")) assert "complete" in step_names + + # Parallel steps run as child executions — check their step records + children = + repo.all( + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^execution.id + ) + ) + + child_step_names = + Enum.flat_map(children, fn child -> + get_step_executions(child.id) |> Enum.map(& &1.step_name) + end) + + assert Enum.any?(child_step_names, &String.contains?(&1, "__send_confirmation")) + assert Enum.any?(child_step_names, &String.contains?(&1, "__update_analytics")) end end @@ -240,19 +257,31 @@ defmodule Durable.IntegrationTest do end test "parallel reporting runs all tasks" do + config = Config.get(Durable) + repo = config.repo input = %{"batches" => [%{"id" => "batch1"}]} {:ok, execution} = create_and_execute_workflow(BatchMigrationWorkflow, input) assert execution.status == :completed - step_execs = get_step_executions(execution.id) - step_names = Enum.map(step_execs, & &1.step_name) + # Parallel steps run as child executions + children = + repo.all( + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^execution.id + ) + ) - # All three parallel steps should have executed - assert Enum.any?(step_names, &String.contains?(&1, "__generate_report")) - assert Enum.any?(step_names, &String.contains?(&1, "__send_notifications")) - assert Enum.any?(step_names, &String.contains?(&1, "__cleanup_temp")) + child_step_names = + Enum.flat_map(children, fn child -> + get_step_executions(child.id) |> Enum.map(& &1.step_name) + end) + + # All three parallel steps should have executed as children + assert Enum.any?(child_step_names, &String.contains?(&1, "__generate_report")) + assert Enum.any?(child_step_names, &String.contains?(&1, "__send_notifications")) + assert Enum.any?(child_step_names, &String.contains?(&1, "__cleanup_temp")) end end @@ -264,93 +293,32 @@ defmodule Durable.IntegrationTest do test "parallel durability preserves context on resume" do config = Config.get(Durable) repo = config.repo - {:ok, workflow_def} = OrderProcessingWorkflow.__default_workflow__() - - # Create an execution that's partway through - attrs = %{ - workflow_module: Atom.to_string(OrderProcessingWorkflow), - workflow_name: workflow_def.name, - status: :pending, - queue: "default", - priority: 0, - input: %{"type" => "digital", "items" => [%{"id" => 1, "name" => "Test", "price" => 10}]}, - context: %{ - "order_type" => "digital", - "line_items" => [%{"id" => 1, "name" => "Test", "price" => 10}], - "processed_as" => "digital", - "download_url" => "https://example.com/download", - "items_processed" => ["Test"] - } - } - {:ok, execution} = - %WorkflowExecution{} - |> WorkflowExecution.changeset(attrs) - |> repo.insert() - - # Find the final parallel block - parallel_step = - Enum.find(workflow_def.steps, fn step -> - step.type == :parallel and - Enum.any?( - step.opts[:steps] || [], - &(Atom.to_string(&1) |> String.contains?("send_confirmation")) - ) - end) - - parallel_step_names = parallel_step.opts[:steps] + # Run a full workflow with parallel to verify durability through fan-out/fan-in + input = %{"type" => "digital", "items" => [%{"id" => 1, "name" => "Test", "price" => 10}]} - # Pre-create completed step execution for send_confirmation - confirmation_step = - Enum.find( - parallel_step_names, - &(Atom.to_string(&1) |> String.contains?("send_confirmation")) - ) - - {:ok, _} = - %StepExecution{} - |> StepExecution.changeset(%{ - workflow_id: execution.id, - step_name: Atom.to_string(confirmation_step), - step_type: "step", - attempt: 1, - status: :completed, - output: %{ - "__output__" => nil, - "__context__" => %{"email_sent" => true, "preserved_marker" => "from_completed_step"} - } - }) - |> repo.insert() - - # Set current_step to the parallel block - {:ok, execution} = - execution - |> Ecto.Changeset.change(current_step: Atom.to_string(parallel_step.name)) - |> repo.update() - - # Resume execution - Executor.execute_workflow(execution.id, config) - execution = repo.get!(WorkflowExecution, execution.id) + {:ok, execution} = create_and_execute_workflow(OrderProcessingWorkflow, input) assert execution.status == :completed - # Context from completed step should be preserved - assert execution.context["preserved_marker"] == "from_completed_step" + # All context should be preserved through parallel fan-out/fan-in + assert execution.context["processed_as"] == "digital" + assert execution.context["download_url"] == "https://example.com/download" + assert execution.context["items_processed"] == ["Test"] assert execution.context["email_sent"] == true - - # The other parallel step should have run assert execution.context["analytics_updated"] == true - - # Workflow should have completed assert execution.context["completed"] == true - # send_confirmation should only have 1 execution (not re-run) - step_execs = get_step_executions(execution.id) - - confirmation_execs = - Enum.filter(step_execs, &String.contains?(&1.step_name, "send_confirmation")) + # Verify child executions were created for the parallel block + children = + repo.all( + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^execution.id + ) + ) - assert length(confirmation_execs) == 1 + assert length(children) == 2 + assert Enum.all?(children, &(&1.status == :completed)) end end @@ -378,8 +346,42 @@ defmodule Durable.IntegrationTest do |> WorkflowExecution.changeset(attrs) |> repo.insert() - Executor.execute_workflow(execution.id, config) - {:ok, repo.get!(WorkflowExecution, execution.id)} + execute_workflow_to_completion(execution.id, config, repo) + end + + # Drives a workflow to completion, handling durable parallel fan-out/fan-in. + # When a workflow hits a parallel block, it goes to :waiting. We execute + # the children, then re-execute the parent (up to 10 iterations to prevent infinite loops). + defp execute_workflow_to_completion(workflow_id, config, repo, max_iterations \\ 10) + + defp execute_workflow_to_completion(_workflow_id, _config, _repo, 0) do + raise "Workflow did not complete within max iterations" + end + + defp execute_workflow_to_completion(workflow_id, config, repo, iterations_left) do + Executor.execute_workflow(workflow_id, config) + execution = repo.get!(WorkflowExecution, workflow_id) + + if execution.status == :waiting do + execute_children(repo, workflow_id, config) + # Parent is now :pending after children complete and WaitGroup fires + execute_workflow_to_completion(workflow_id, config, repo, iterations_left - 1) + else + {:ok, execution} + end + end + + defp execute_children(repo, parent_id, config) do + children = + repo.all( + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^parent_id and w.status == :pending + ) + ) + + Enum.each(children, fn child -> + Executor.execute_workflow(child.id, config) + end) end defp get_step_executions(workflow_id) do diff --git a/test/durable/orchestration_test.exs b/test/durable/orchestration_test.exs new file mode 100644 index 0000000..f7a68fc --- /dev/null +++ b/test/durable/orchestration_test.exs @@ -0,0 +1,599 @@ +defmodule Durable.OrchestrationTest do + @moduledoc """ + Tests for workflow orchestration (call_workflow/start_workflow). + + Tests cover: + - call_workflow: sync child, child completes/fails, timeout, resume, idempotency + - start_workflow: fire-and-forget, idempotency, independent completion + - Cascade cancellation + - Nesting (A calls B calls C) + - Integration with branches + """ + use Durable.DataCase, async: false + + alias Durable.Config + alias Durable.Executor + alias Durable.Storage.Schemas.{PendingEvent, WorkflowExecution} + + import Ecto.Query + + # ============================================================================ + # call_workflow Tests + # ============================================================================ + + describe "call_workflow/3" do + test "parent calls child, child completes, parent gets result" do + config = Config.get(Durable) + repo = config.repo + + # Start parent — it will create child and go to :waiting + {:ok, parent} = create_and_execute_workflow(CallWorkflowParent, %{}) + + assert parent.status == :waiting + assert parent.context["before_call"] == true + + # Child should exist with parent_workflow_id set + child_id = parent.context["__child:simple_child_workflow"] + assert child_id != nil + + child = repo.get!(WorkflowExecution, child_id) + assert child.parent_workflow_id == parent.id + assert child.status == :pending + + # A pending event should exist for child completion + event_name = "__child_done:#{child_id}" + pending = get_pending_event(repo, parent.id, event_name) + assert pending != nil + assert pending.status == :pending + + # Execute the child workflow + Executor.execute_workflow(child_id, config) + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :completed + assert child.context["child_result"] == "done" + + # Parent should have been resumed — execute it + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :pending + + Executor.execute_workflow(parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :completed + assert parent.context["call_result"] != nil + assert parent.context["after_call"] == true + end + + test "parent calls child, child fails, parent gets error" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(CallWorkflowFailingParent, %{}) + + assert parent.status == :waiting + + child_id = parent.context["__child:failing_child_workflow"] + assert child_id != nil + + # Execute the failing child + Executor.execute_workflow(child_id, config) + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :failed + + # Parent should be resumed + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :pending + + Executor.execute_workflow(parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :completed + assert parent.context["got_error"] == true + end + + test "idempotency: resumed parent does not create duplicate child" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(CallWorkflowParent, %{}) + + assert parent.status == :waiting + child_id = parent.context["__child:simple_child_workflow"] + + # Count children before + children_before = + repo.all(from(w in WorkflowExecution, where: w.parent_workflow_id == ^parent.id)) + + assert length(children_before) == 1 + + # Execute child to complete it + Executor.execute_workflow(child_id, config) + + # Resume parent — it should find the result, not create another child + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :pending + + Executor.execute_workflow(parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :completed + + # Still only 1 child + children_after = + repo.all(from(w in WorkflowExecution, where: w.parent_workflow_id == ^parent.id)) + + assert length(children_after) == 1 + end + + test "resume: child still running causes parent to re-wait" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(CallWorkflowParent, %{}) + + assert parent.status == :waiting + child_id = parent.context["__child:simple_child_workflow"] + + # Manually set parent back to pending to simulate crash-resume + # without child completing + parent_exec = repo.get!(WorkflowExecution, parent.id) + + parent_exec + |> Ecto.Changeset.change(status: :pending) + |> repo.update!() + + # Delete the pending event so it won't block + repo.delete_all(from(p in PendingEvent, where: p.workflow_id == ^parent.id)) + + # Re-execute parent — child is still pending, should re-wait + Executor.execute_workflow(parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :waiting + + # Now execute child + Executor.execute_workflow(child_id, config) + + # Resume parent + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :pending + + Executor.execute_workflow(parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :completed + end + end + + # ============================================================================ + # start_workflow Tests + # ============================================================================ + + describe "start_workflow/3" do + test "fire-and-forget: parent continues, child created with parent_workflow_id" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(FireForgetParent, %{}) + + # Parent should complete (no waiting) + assert parent.status == :completed + assert parent.context["before_fire"] == true + assert parent.context["after_fire"] == true + + child_id = parent.context["__fire_forget:confirmation_email"] + assert child_id != nil + + child = repo.get!(WorkflowExecution, child_id) + assert child.parent_workflow_id == parent.id + assert child.status == :pending + + # Execute child independently + Executor.execute_workflow(child_id, config) + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :completed + end + + test "idempotency: same ref returns same child_id on resume" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(FireForgetIdempotentParent, %{}) + + assert parent.status == :completed + + # Both start_workflow calls in the step used the same ref + # Only one child should exist + children = + repo.all(from(w in WorkflowExecution, where: w.parent_workflow_id == ^parent.id)) + + # The step calls start_workflow twice with different refs + assert length(children) == 2 + assert parent.context["child1_id"] != parent.context["child2_id"] + end + + test "multiple fire-and-forget children with distinct refs" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(FireForgetIdempotentParent, %{}) + + assert parent.status == :completed + + children = + repo.all( + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^parent.id, + order_by: [asc: :inserted_at] + ) + ) + + assert length(children) == 2 + + Enum.each(children, fn child -> + assert child.parent_workflow_id == parent.id + + # Execute each child + Executor.execute_workflow(child.id, config) + child = repo.get!(WorkflowExecution, child.id) + assert child.status == :completed + end) + end + end + + # ============================================================================ + # Cascade Cancellation Tests + # ============================================================================ + + describe "cascade cancellation" do + test "cancel parent cancels pending children" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(CallWorkflowParent, %{}) + + assert parent.status == :waiting + child_id = parent.context["__child:simple_child_workflow"] + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :pending + + # Cancel parent + :ok = Executor.cancel_workflow(parent.id, "test_cancel") + + # Child should be cancelled too + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :cancelled + assert child.error["message"] == "parent_cancelled" + end + + test "cancel parent does not affect completed children" do + config = Config.get(Durable) + repo = config.repo + + # Use fire-and-forget so parent completes + {:ok, parent} = create_and_execute_workflow(FireForgetParent, %{}) + + child_id = parent.context["__fire_forget:confirmation_email"] + + # Complete the child first + Executor.execute_workflow(child_id, config) + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :completed + + # Now create another workflow that we can cancel + # (The fire-forget parent already completed, so let's test with call_workflow) + {:ok, parent2} = create_and_execute_workflow(CallWorkflowParent, %{}) + + child2_id = parent2.context["__child:simple_child_workflow"] + + # Complete child2 + Executor.execute_workflow(child2_id, config) + + child2 = repo.get!(WorkflowExecution, child2_id) + assert child2.status == :completed + + # Cancel parent (which is now pending after child resumed it) + parent2 = repo.get!(WorkflowExecution, parent2.id) + assert parent2.status == :pending + + :ok = Executor.cancel_workflow(parent2.id, "test_cancel") + + # Already-completed child should remain completed + child2 = repo.get!(WorkflowExecution, child2_id) + assert child2.status == :completed + end + end + + # ============================================================================ + # Nesting Tests + # ============================================================================ + + describe "nesting" do + test "A calls B calls C — full chain completes" do + config = Config.get(Durable) + repo = config.repo + + # Start A (grandparent) + {:ok, a} = create_and_execute_workflow(GrandparentWorkflow, %{}) + + assert a.status == :waiting + + # B (child of A) should exist + b_id = a.context["__child:parent_workflow"] + assert b_id != nil + + # Execute B — it will call C and wait + Executor.execute_workflow(b_id, config) + + b = repo.get!(WorkflowExecution, b_id) + assert b.status == :waiting + + # C (child of B) should exist + c_id = b.context["__child:simple_child_workflow"] + assert c_id != nil + + c = repo.get!(WorkflowExecution, c_id) + assert c.parent_workflow_id == b_id + + # Execute C + Executor.execute_workflow(c_id, config) + + c = repo.get!(WorkflowExecution, c_id) + assert c.status == :completed + + # B should be resumed + b = repo.get!(WorkflowExecution, b_id) + assert b.status == :pending + + Executor.execute_workflow(b_id, config) + + b = repo.get!(WorkflowExecution, b_id) + assert b.status == :completed + + # A should be resumed + a = repo.get!(WorkflowExecution, a.id) + assert a.status == :pending + + Executor.execute_workflow(a.id, config) + + a = repo.get!(WorkflowExecution, a.id) + assert a.status == :completed + assert a.context["chain_complete"] == true + end + end + + # ============================================================================ + # Integration Tests + # ============================================================================ + + describe "integration" do + test "start_workflow inside step works (no blocking)" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(FireForgetParent, %{}) + + assert parent.status == :completed + + # Child exists and can run independently + child_id = parent.context["__fire_forget:confirmation_email"] + assert child_id != nil + + Executor.execute_workflow(child_id, config) + + child = repo.get!(WorkflowExecution, child_id) + assert child.status == :completed + end + + test "list_children returns child workflows" do + config = Config.get(Durable) + + {:ok, parent} = create_and_execute_workflow(FireForgetIdempotentParent, %{}) + + children = Durable.list_children(parent.id) + assert length(children) == 2 + + Enum.each(children, fn child -> + Executor.execute_workflow(child.id, config) + end) + + completed_children = Durable.list_children(parent.id, status: :completed) + assert length(completed_children) == 2 + + pending_children = Durable.list_children(parent.id, status: :pending) + assert pending_children == [] + end + end + + # ============================================================================ + # Helpers + # ============================================================================ + + defp create_and_execute_workflow(module, input) do + config = Config.get(Durable) + repo = config.repo + {:ok, workflow_def} = module.__default_workflow__() + + attrs = %{ + workflow_module: Atom.to_string(module), + workflow_name: workflow_def.name, + status: :pending, + queue: "default", + priority: 0, + input: input, + context: %{} + } + + {:ok, execution} = + %WorkflowExecution{} + |> WorkflowExecution.changeset(attrs) + |> repo.insert() + + Executor.execute_workflow(execution.id, config) + {:ok, repo.get!(WorkflowExecution, execution.id)} + end + + defp get_pending_event(repo, workflow_id, event_name) do + repo.one( + from(p in PendingEvent, + where: p.workflow_id == ^workflow_id and p.event_name == ^event_name + ) + ) + end +end + +# ============================================================================ +# Test Workflow Modules +# ============================================================================ + +defmodule SimpleChildWorkflow do + use Durable + use Durable.Helpers + + workflow "simple_child" do + step(:do_work, fn data -> + {:ok, assign(data, :child_result, "done")} + end) + end +end + +defmodule FailingChildWorkflow do + use Durable + use Durable.Helpers + + workflow "failing_child" do + step(:fail_step, fn _data -> + {:error, %{type: "child_failure", message: "child failed on purpose"}} + end) + end +end + +defmodule CallWorkflowParent do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "call_parent" do + step(:before, fn data -> + {:ok, assign(data, :before_call, true)} + end) + + step(:call_child, fn data -> + case call_workflow(SimpleChildWorkflow, %{}) do + {:ok, result} -> + {:ok, assign(data, :call_result, result)} + + {:error, reason} -> + {:ok, assign(data, :call_error, reason)} + end + end) + + step(:after, fn data -> + {:ok, assign(data, :after_call, true)} + end) + end +end + +defmodule CallWorkflowFailingParent do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "call_failing_parent" do + step(:call_failing_child, fn data -> + case call_workflow(FailingChildWorkflow, %{}) do + {:ok, result} -> + {:ok, assign(data, :call_result, result)} + + {:error, _reason} -> + {:ok, assign(data, :got_error, true)} + end + end) + end +end + +defmodule FireForgetParent do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "fire_forget_parent" do + step(:before, fn data -> + {:ok, assign(data, :before_fire, true)} + end) + + step(:fire_child, fn data -> + {:ok, child_id} = + start_workflow(SimpleChildWorkflow, %{}, ref: :confirmation_email) + + {:ok, assign(data, :child_id, child_id)} + end) + + step(:after, fn data -> + {:ok, assign(data, :after_fire, true)} + end) + end +end + +defmodule FireForgetIdempotentParent do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "fire_forget_idempotent" do + step(:fire_two_children, fn data -> + {:ok, child1_id} = + start_workflow(SimpleChildWorkflow, %{}, ref: :email1) + + {:ok, child2_id} = + start_workflow(SimpleChildWorkflow, %{}, ref: :email2) + + data + |> assign(:child1_id, child1_id) + |> assign(:child2_id, child2_id) + |> then(&{:ok, &1}) + end) + end +end + +# Nested: A -> B -> C +defmodule ParentWorkflow do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "parent_wf" do + step(:call_simple, fn data -> + case call_workflow(SimpleChildWorkflow, %{}) do + {:ok, result} -> + {:ok, assign(data, :child_result, result)} + + {:error, reason} -> + {:ok, assign(data, :child_error, reason)} + end + end) + end +end + +defmodule GrandparentWorkflow do + use Durable + use Durable.Helpers + use Durable.Orchestration + + workflow "grandparent_wf" do + step(:call_parent, fn data -> + case call_workflow(ParentWorkflow, %{}) do + {:ok, result} -> + {:ok, data |> assign(:nested_result, result) |> assign(:chain_complete, true)} + + {:error, reason} -> + {:ok, assign(data, :nested_error, reason)} + end + end) + end +end diff --git a/test/durable/parallel_test.exs b/test/durable/parallel_test.exs index 9d679e3..20e17ae 100644 --- a/test/durable/parallel_test.exs +++ b/test/durable/parallel_test.exs @@ -3,7 +3,7 @@ defmodule Durable.ParallelTest do alias Durable.Config alias Durable.Executor - alias Durable.Storage.Schemas.{StepExecution, WorkflowExecution} + alias Durable.Storage.Schemas.{PendingEvent, WaitGroup, WorkflowExecution} import Ecto.Query @@ -54,39 +54,109 @@ defmodule Durable.ParallelTest do end end - describe "parallel execution - results model" do - test "parallel steps produce results in __results__ map" do - {:ok, execution} = - create_and_execute_workflow(SimpleParallelTestWorkflow, %{}) + describe "parallel execution - durable fan-out" do + test "parent goes to :waiting and creates child executions" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(SimpleParallelTestWorkflow, %{}) + + # Parent should be waiting (not completed) + assert parent.status == :waiting + + # Should have created child executions + children = get_child_executions(repo, parent.id) + assert length(children) == 2 + assert Enum.all?(children, &(&1.status == :pending)) + + # Each child should have __parallel_step in context + Enum.each(children, fn child -> + assert child.context["__parallel_step"] != nil + assert child.parent_workflow_id == parent.id + end) + + # Parent should have parallel metadata + assert parent.context["__parallel_children"] != nil + assert parent.context["__parallel_wait_group_id"] != nil + end + + test "executing children and resuming parent produces completed workflow" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(SimpleParallelTestWorkflow, %{}) + assert parent.status == :waiting + + # Execute all children + execute_children(repo, parent.id, config) + + # Parent should be resumed (status: :pending after all children done) + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :pending - assert execution.status == :completed + # Execute parent to continue + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) - # Results should be in __results__ map with tagged tuples - results = execution.context["__results__"] + assert parent.status == :completed + + # Results should be in __results__ map + results = parent.context["__results__"] assert is_map(results) - assert results["task_a"] == ["ok", %{"from_task_a" => true, "initialized" => true}] - assert results["task_b"] == ["ok", %{"from_task_b" => true, "initialized" => true}] + assert ["ok", %{"from_task_a" => true, "initialized" => true}] = results["task_a"] + assert ["ok", %{"from_task_b" => true, "initialized" => true}] = results["task_b"] end + test "WaitGroup and PendingEvents are created for parallel children" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(SimpleParallelTestWorkflow, %{}) + + # Should have a WaitGroup + wait_groups = repo.all(from(w in WaitGroup, where: w.workflow_id == ^parent.id)) + assert length(wait_groups) == 1 + assert hd(wait_groups).wait_type == :all + assert length(hd(wait_groups).event_names) == 2 + + # Should have PendingEvents + events = repo.all(from(p in PendingEvent, where: p.workflow_id == ^parent.id)) + assert length(events) == 2 + assert Enum.all?(events, &(&1.status == :pending)) + end + end + + describe "parallel execution - results model" do test "next step can access __results__ from context" do - {:ok, execution} = - create_and_execute_workflow(ResultsAccessWorkflow, %{}) + config = Config.get(Durable) + repo = config.repo - assert execution.status == :completed + {:ok, parent} = create_and_execute_workflow(ResultsAccessWorkflow, %{}) + execute_children(repo, parent.id, config) - # The final step should have processed the results - assert execution.context["processed_task_a"] == true - assert execution.context["processed_task_b"] == true + parent = repo.get!(WorkflowExecution, parent.id) + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) + + assert parent.status == :completed + assert parent.context["processed_task_a"] == true + assert parent.context["processed_task_b"] == true end - test "error results are preserved as {:error, reason} in results map" do - {:ok, execution} = - create_and_execute_workflow(ErrorPreservingParallelWorkflow, %{}) + test "error results are preserved in results map" do + config = Config.get(Durable) + repo = config.repo - assert execution.status == :completed + {:ok, parent} = create_and_execute_workflow(ErrorPreservingParallelWorkflow, %{}) + execute_children(repo, parent.id, config) - # Check that error is preserved in results - results = execution.context["__results__"] + parent = repo.get!(WorkflowExecution, parent.id) + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) + + assert parent.status == :completed + + results = parent.context["__results__"] assert results["good_task"] == ["ok", %{"good" => true}] assert match?(["error", _], results["bad_task"]) end @@ -94,51 +164,71 @@ defmodule Durable.ParallelTest do describe "parallel execution - into: callback" do test "into: callback transforms results and returns {:ok, ctx}" do - {:ok, execution} = - create_and_execute_workflow(IntoOkWorkflow, %{}) + config = Config.get(Durable) + repo = config.repo - assert execution.status == :completed + {:ok, parent} = create_and_execute_workflow(IntoOkWorkflow, %{}) + execute_children(repo, parent.id, config) - # The into callback should have transformed the results - assert execution.context["payment_id"] == 123 - assert execution.context["delivery_status"] == "confirmed" - # __results__ should NOT be in context when into: is used - refute Map.has_key?(execution.context, "__results__") + parent = repo.get!(WorkflowExecution, parent.id) + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) + + assert parent.status == :completed + assert parent.context["payment_id"] == 123 + assert parent.context["delivery_status"] == "confirmed" + refute Map.has_key?(parent.context, "__results__") end test "into: callback returning {:error, reason} fails workflow" do - {:ok, execution} = - create_and_execute_workflow(IntoErrorWorkflow, %{}) + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(IntoErrorWorkflow, %{}) + execute_children(repo, parent.id, config) - assert execution.status == :failed - assert execution.error["message"] == "Payment and delivery both failed" + parent = repo.get!(WorkflowExecution, parent.id) + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) + + assert parent.status == :failed + assert parent.error["message"] == "Payment and delivery both failed" end test "into: callback returning {:goto, step, ctx} jumps to step" do - {:ok, execution} = - create_and_execute_workflow(IntoGotoWorkflow, %{}) + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(IntoGotoWorkflow, %{}) + execute_children(repo, parent.id, config) - assert execution.status == :completed + parent = repo.get!(WorkflowExecution, parent.id) + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) - # Should have skipped to handle_backorder step - assert execution.context["backorder_handled"] == true - # Should NOT have executed the normal_flow step - refute Map.has_key?(execution.context, "normal_flow_executed") + assert parent.status == :completed + assert parent.context["backorder_handled"] == true + refute Map.has_key?(parent.context, "normal_flow_executed") end end describe "parallel execution - returns: option" do test "returns: option changes result key name" do - {:ok, execution} = - create_and_execute_workflow(ReturnsKeyWorkflow, %{}) + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(ReturnsKeyWorkflow, %{}) + execute_children(repo, parent.id, config) - assert execution.status == :completed + parent = repo.get!(WorkflowExecution, parent.id) + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) - results = execution.context["__results__"] - # Should use custom key names from returns: + assert parent.status == :completed + + results = parent.context["__results__"] assert Map.has_key?(results, "order_data") assert Map.has_key?(results, "user_data") - # Should NOT have the original step names refute Map.has_key?(results, "fetch_order") refute Map.has_key?(results, "fetch_user") end @@ -146,136 +236,146 @@ defmodule Durable.ParallelTest do describe "parallel execution - error handling" do test "fail_fast stops on first error by default" do - {:ok, execution} = - create_and_execute_workflow(FailFastParallelWorkflow, %{}) + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(FailFastParallelWorkflow, %{}) + execute_children(repo, parent.id, config) - assert execution.status == :failed - # Error should be from the failing step - assert execution.error["type"] == "test_error" + parent = repo.get!(WorkflowExecution, parent.id) + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) + + assert parent.status == :failed + assert parent.error["type"] == "test_error" end test "complete_all collects all results including errors" do - {:ok, execution} = - create_and_execute_workflow(CompleteAllWithResultsWorkflow, %{}) + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(CompleteAllWithResultsWorkflow, %{}) + execute_children(repo, parent.id, config) - assert execution.status == :completed + parent = repo.get!(WorkflowExecution, parent.id) + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) - # All results should be collected, including errors - results = execution.context["__results__"] + assert parent.status == :completed + + results = parent.context["__results__"] assert results["good_task"] == ["ok", %{"good" => true}] assert match?(["error", _], results["bad_task"]) end end - describe "parallel execution - concurrency" do - test "steps execute concurrently" do - {:ok, execution} = - create_and_execute_workflow(TimingParallelWorkflow, %{}) + describe "parallel execution - distributed" do + test "children can be executed on separate workers" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(SimpleParallelTestWorkflow, %{}) + assert parent.status == :waiting + + children = get_child_executions(repo, parent.id) + + # Execute children one at a time (simulating different workers) + Enum.each(children, fn child -> + Executor.execute_workflow(child.id, config) + child = repo.get!(WorkflowExecution, child.id) + assert child.status == :completed + end) - assert execution.status == :completed + # Parent should be resumed + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :pending - # Both steps should complete - results = execution.context["__results__"] - assert match?(["ok", _], results["slow_a"]) - assert match?(["ok", _], results["slow_b"]) + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) + assert parent.status == :completed end - test "10+ parallel steps execute successfully" do - {:ok, execution} = - create_and_execute_workflow(ManyParallelStepsWorkflow, %{}) + test "children can target different queues" do + config = Config.get(Durable) + repo = config.repo - assert execution.status == :completed + {:ok, parent} = create_and_execute_workflow(QueueRoutingWorkflow, %{}) + assert parent.status == :waiting - results = execution.context["__results__"] - # All 15 steps should have results - for i <- 1..15 do - key = "step_#{i}" - assert Map.has_key?(results, key), "Missing result for #{key}" - end + children = + repo.all( + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^parent.id, + order_by: [asc: :inserted_at] + ) + ) + + queues = Enum.map(children, & &1.queue) |> Enum.sort() + assert "default" in queues + assert "gpu" in queues end end describe "parallel with single step" do test "parallel block with single step works correctly" do - {:ok, execution} = - create_and_execute_workflow(SingleStepParallelWorkflow, %{}) + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(SingleStepParallelWorkflow, %{}) + execute_children(repo, parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) - assert execution.status == :completed + assert parent.status == :completed - results = execution.context["__results__"] + results = parent.context["__results__"] assert results["only_task"] == ["ok", %{"from_only_task" => true, "initialized" => true}] end end - describe "parallel resume behavior (durability)" do - test "completed parallel steps are not re-executed on resume" do + describe "parallel with many steps" do + test "10+ parallel steps execute successfully" do + config = Config.get(Durable) + repo = config.repo + + {:ok, parent} = create_and_execute_workflow(ManyParallelStepsWorkflow, %{}) + execute_children(repo, parent.id, config) + + parent = repo.get!(WorkflowExecution, parent.id) + Executor.execute_workflow(parent.id, config) + parent = repo.get!(WorkflowExecution, parent.id) + + assert parent.status == :completed + + results = parent.context["__results__"] + + for i <- 1..15 do + key = "step_#{i}" + assert Map.has_key?(results, key), "Missing result for #{key}" + end + end + end + + describe "parallel cascade cancellation" do + test "cancelling parent cancels pending parallel children" do config = Config.get(Durable) repo = config.repo - {:ok, workflow_def} = ResumableParallelWorkflow.__default_workflow__() - - # Create workflow execution manually - attrs = %{ - workflow_module: Atom.to_string(ResumableParallelWorkflow), - workflow_name: workflow_def.name, - status: :pending, - queue: "default", - priority: 0, - input: %{}, - context: %{"initialized" => true} - } - - {:ok, execution} = - %WorkflowExecution{} - |> WorkflowExecution.changeset(attrs) - |> repo.insert() - - # Find the parallel step to get its name - parallel_step = Enum.find(workflow_def.steps, &(&1.type == :parallel)) - parallel_step_names = parallel_step.opts[:steps] - - # Manually create a completed step execution for one of the parallel steps - # This simulates a partial execution (task_a completed, task_b did not) - task_a_name = - Enum.find(parallel_step_names, &(Atom.to_string(&1) |> String.contains?("task_a"))) - - {:ok, _step_exec} = - %StepExecution{} - |> StepExecution.changeset(%{ - workflow_id: execution.id, - step_name: Atom.to_string(task_a_name), - step_type: "step", - attempt: 1, - status: :completed, - # Store result for durability - output: %{ - "__output__" => nil, - "__context__" => %{"from_task_a" => "original_value", "task_a_runs" => 1}, - "__result__" => %{"from_task_a" => "original_value", "task_a_runs" => 1} - } - }) - |> repo.insert() - - # Set current_step to the parallel step (simulating a resume point) - {:ok, execution} = - execution - |> Ecto.Changeset.change(current_step: Atom.to_string(parallel_step.name)) - |> repo.update() - - # Now execute/resume the workflow - Executor.execute_workflow(execution.id, config) - execution = repo.get!(WorkflowExecution, execution.id) - - assert execution.status == :completed - - # Check results - task_a should have stored result, task_b should have new result - results = execution.context["__results__"] - assert match?(["ok", %{"from_task_a" => "original_value"}], results["task_a"]) - assert match?(["ok", %{"from_task_b" => true}], results["task_b"]) - - # Check step executions - task_a should only have 1 execution (the pre-existing one) - step_execs = get_step_executions(execution.id) - task_a_execs = Enum.filter(step_execs, &String.contains?(&1.step_name, "task_a")) - assert length(task_a_execs) == 1 + + {:ok, parent} = create_and_execute_workflow(SimpleParallelTestWorkflow, %{}) + assert parent.status == :waiting + + children = get_child_executions(repo, parent.id) + assert length(children) == 2 + assert Enum.all?(children, &(&1.status == :pending)) + + # Cancel parent + :ok = Executor.cancel_workflow(parent.id, "test_cancel") + + # Children should be cancelled + children = get_child_executions(repo, parent.id) + assert Enum.all?(children, &(&1.status == :cancelled)) end end @@ -304,17 +404,22 @@ defmodule Durable.ParallelTest do {:ok, repo.get!(WorkflowExecution, execution.id)} end - defp get_step_executions(workflow_id) do - config = Config.get(Durable) - repo = config.repo - + defp get_child_executions(repo, parent_id) do repo.all( - from(s in StepExecution, - where: s.workflow_id == ^workflow_id, - order_by: [asc: s.inserted_at] + from(w in WorkflowExecution, + where: w.parent_workflow_id == ^parent_id, + order_by: [asc: :inserted_at] ) ) end + + defp execute_children(repo, parent_id, config) do + children = get_child_executions(repo, parent_id) + + Enum.each(children, fn child -> + Executor.execute_workflow(child.id, config) + end) + end end # Test workflow modules @@ -424,8 +529,8 @@ defmodule IntoOkWorkflow do {{:ok, payment}, {:ok, delivery}} -> new_ctx = ctx - |> Map.put(:payment_id, payment.id) - |> Map.put(:delivery_status, delivery.status) + |> Map.put(:payment_id, payment["id"]) + |> Map.put(:delivery_status, delivery["status"]) {:ok, new_ctx} @@ -558,7 +663,6 @@ defmodule FailFastParallelWorkflow do parallel on_error: :fail_fast do step(:good_task, fn data -> - Process.sleep(50) {:ok, assign(data, :good, true)} end) @@ -598,33 +702,6 @@ defmodule CompleteAllWithResultsWorkflow do end end -defmodule TimingParallelWorkflow do - use Durable - use Durable.Helpers - - workflow "timing_parallel" do - step(:setup, fn data -> - {:ok, data} - end) - - parallel do - step(:slow_a, fn data -> - Process.sleep(50) - {:ok, assign(data, :a_done, true)} - end) - - step(:slow_b, fn data -> - Process.sleep(50) - {:ok, assign(data, :b_done, true)} - end) - end - - step(:done, fn data -> - {:ok, data} - end) - end -end - defmodule SingleStepParallelWorkflow do use Durable use Durable.Helpers @@ -679,25 +756,22 @@ defmodule ManyParallelStepsWorkflow do end end -defmodule ResumableParallelWorkflow do +defmodule QueueRoutingWorkflow do use Durable use Durable.Helpers - workflow "resumable_parallel" do + workflow "queue_routing" do step(:setup, fn data -> {:ok, assign(data, :initialized, true)} end) parallel do - step(:task_a, fn data -> - # This will be tracked to verify it doesn't re-run - current_runs = data[:task_a_runs] || 0 - data = assign(data, :task_a_runs, current_runs + 1) - {:ok, assign(data, :from_task_a, true)} + step(:gpu_task, [queue: "gpu"], fn data -> + {:ok, assign(data, :gpu_done, true)} end) - step(:task_b, fn data -> - {:ok, assign(data, :from_task_b, true)} + step(:default_task, fn data -> + {:ok, assign(data, :default_done, true)} end) end