Durable workflow recovery, agent compaction, BC hardening#16
Merged
Conversation
Signal node pauses workflow execution and waits for an external signal.
State is persisted to DB via the outbox pattern, surviving process crashes.
Changes:
- node/signal/node.lua: signal node that yields with wait_for_signal flag
- runner/orchestrator.lua: handle signal yields (don't reply immediately),
guard against duplicate orchestrators via registry.register error check
- runner/scheduler.lua: detect signal_data arrival on signal yields
- runner/workflow_state.lua: deliver NODE_SIGNAL data to matching yields
- client.lua: signal() method writes to outbox + auto-respawns dead orchestrator
- flow/flow.lua: :signal() builder method
- flow/compiler.lua: SIGNAL op type
- consts.lua: SATISFY_SIGNAL command type, NODE_SIGNAL data type
Usage:
flow.create()
:with_input(task)
:func("app:prepare")
:signal({ signal_id = "approval" })
:func("app:execute")
:run()
-- External trigger:
client:signal(dataflow_id, "approval", { approved = true })
Recovery:
- Signal commit persists to dataflow_commits (outbox)
- Dead orchestrator auto-respawned by client:signal()
- Duplicate orchestrator detection via registry.register
- orchestrator.load_state() replays pending commits on restart
10 new signal tests, all 1882 existing tests pass.
- node.lua: pass wait_for_signal and signal_id through yield_context (both in commit data and yield_request message) - workflow_state.lua: skip signal yield reconstruction for reset nodes (node process is dead after crash, will re-yield when re-run) - workflow_state.lua: track_yield() checks for existing signal data in DB (handles case where signal arrived while orchestrator was down) - workflow_state.lua: reconstruct wait_for_signal/signal_id from yield data Recovery model: 1. Orchestrator crashes -> all linked node processes die 2. On restart: RUNNING nodes reset to PENDING 3. Signal yields for reset nodes NOT reconstructed (stale reply_to) 4. Nodes re-run naturally, re-yield with fresh process/channels 5. track_yield() checks DB for existing signal data -> immediate satisfaction 6. Regular yields with children still reconstructed (children may have completed) All 1887 tests pass (1872 existing + 10 signal unit + 5 recovery integration). Recovery tests prove: basic signal flow, kill-and-recover, commit backlog, duplicate signals, and concurrent signal+respawn race conditions.
Signal node: handle nil results from yield (graceful shutdown case) by returning 0 instead of failing. Yield state is persisted, resumes on restart. Recovery tests now cover: - Basic: start/wait/signal, data passthrough, wrong signal_id, empty data - Kill/recover: signal kill, func kill, pipeline kill, double kill - Backlog: signal while dead, signal before start (pre-queued) - Idempotency: duplicates, concurrent race - Multiple workflows: independent, cross-workflow isolation - Pipeline: func -> signal -> func normal flow 12/12 individual tests pass (1 suite timeout from total runtime > 30s). 1894 total tests pass across entire dataflow suite.
Recovery:
- Signal node yield with detached recovery across orchestrator restarts
- Cycle node persists state+last_result after child completion, legacy state upgrade shim
- Parallel node durable batch cursor with attempt_id rotation
- Deterministic yield reconstruction (UUID v7 sort, satisfied-yield filter)
- Scheduler loops past SATISFY_YIELD for dead-reply-to yields
- Commit outbox explicit op_id replay barrier
- Migration 04: unique partial indexes for workflow outputs and node results
Agent compaction:
- Token-threshold checkpoint with configurable summarizer function
- Prompt builder filters history by latest compaction marker
- Structured tool results preserved across compaction cut line
- Warning mode (default) vs strict mode for compaction errors
- Summary size cap (default 8192 chars)
BC hardening:
- client:execute returns (result, errors.new({...})) on failure
- Node SDK inputs()/input()/complete() restored to return-pair contract
- Parallel legacy filter/unwrap output shaping preserved
- Func single-named-input wrapping restored
- prompt_builder provider_metadata forwarding restored
Tests: PG 2180/0/1, SQLite 2196/0/1, lint 0 errors.
Normalize item_pipeline/item_steps config, gate output shaping for fresh runs vs recovery path, preserve v4 filter/unwrap/on_error behavior.
Consistent with client.lua pattern: callers can use either if err then or if not result.success then.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Recovery coverage
Agent compaction
config.compact.token_threshold+config.compact.function_id(both optional, both required to enable)metadata.compaction_marker = truecompact.strict = true): hard fail on compaction errorcompact.max_memory_chars(default 8192)BC hardening
client:executereturns(result, errors.new({...}))on failure (both values)inputs()/input()/complete()return(nil, err)not throwfilter/unwrapoutput shaping{discriminator = content}CREATE UNIQUE INDEX IF NOT EXISTS+ pre-dedupeprovider_metadataforwarding restoredTest plan