Skip to content

Durable workflow recovery, agent compaction, BC hardening#16

Merged
msmakouz merged 9 commits intov4from
feature/signal-node
Apr 15, 2026
Merged

Durable workflow recovery, agent compaction, BC hardening#16
msmakouz merged 9 commits intov4from
feature/signal-node

Conversation

@wolfy-j
Copy link
Copy Markdown
Contributor

@wolfy-j wolfy-j commented Apr 12, 2026

Summary

  • Signal, cycle, parallel, and agent nodes survive orchestrator restarts with correct-once semantics
  • Agent conversation compaction via token-threshold checkpoints (opt-in per node config)
  • Backward-compatible API contracts restored (client:execute, node SDK, parallel output, func input)
  • Migration 04 adds unique partial indexes for workflow outputs and node results

Recovery coverage

  • Chaos tests: 8/8 on both PG and SQLite (5-8 random kills per test)
  • Agent recovery: 4 tests (mid-tool-call, post-LLM, multi-turn, restart)
  • Cycle recovery: legacy state upgrade shim + regression test
  • Parallel recovery: durable batch cursor with attempt_id rotation
  • Nested recovery: parallel-of-cycles, cycle-of-signals, signal-of-parallel

Agent compaction

  • config.compact.token_threshold + config.compact.function_id (both optional, both required to enable)
  • Marker written as AGENT_MEMORY row with metadata.compaction_marker = true
  • Prompt builder filters history by latest marker; structured tool results preserved across cut line
  • Warning mode (default): compaction errors log observation, don't kill the turn
  • Strict mode (opt-in compact.strict = true): hard fail on compaction error
  • Summary size cap: compact.max_memory_chars (default 8192)

BC hardening

  • client:execute returns (result, errors.new({...})) on failure (both values)
  • Node SDK inputs()/input()/complete() return (nil, err) not throw
  • Parallel preserves legacy filter/unwrap output shaping
  • Func single-named-input wrapped as {discriminator = content}
  • Cycle legacy state upgrade shim for in-flight v4 workflows
  • Migration 04: CREATE UNIQUE INDEX IF NOT EXISTS + pre-dedupe
  • Prompt builder provider_metadata forwarding restored

Test plan

  • PG full suite: 2180 tests / 0 failed / 1 skipped
  • SQLite full suite: 2196 tests / 0 failed / 1 skipped
  • wippy lint: 0 errors
  • Chaos suite 8/8 on both backends
  • Agent compaction 22/22 tests
  • BC regression tests (BC_REGRESSION_C1-C5, S1-S2)

wolfy-j added 9 commits April 8, 2026 23:31
Signal node pauses workflow execution and waits for an external signal.
State is persisted to DB via the outbox pattern, surviving process crashes.

Changes:
- node/signal/node.lua: signal node that yields with wait_for_signal flag
- runner/orchestrator.lua: handle signal yields (don't reply immediately),
  guard against duplicate orchestrators via registry.register error check
- runner/scheduler.lua: detect signal_data arrival on signal yields
- runner/workflow_state.lua: deliver NODE_SIGNAL data to matching yields
- client.lua: signal() method writes to outbox + auto-respawns dead orchestrator
- flow/flow.lua: :signal() builder method
- flow/compiler.lua: SIGNAL op type
- consts.lua: SATISFY_SIGNAL command type, NODE_SIGNAL data type

Usage:
  flow.create()
    :with_input(task)
    :func("app:prepare")
    :signal({ signal_id = "approval" })
    :func("app:execute")
    :run()

  -- External trigger:
  client:signal(dataflow_id, "approval", { approved = true })

Recovery:
- Signal commit persists to dataflow_commits (outbox)
- Dead orchestrator auto-respawned by client:signal()
- Duplicate orchestrator detection via registry.register
- orchestrator.load_state() replays pending commits on restart

10 new signal tests, all 1882 existing tests pass.
- node.lua: pass wait_for_signal and signal_id through yield_context
  (both in commit data and yield_request message)
- workflow_state.lua: skip signal yield reconstruction for reset nodes
  (node process is dead after crash, will re-yield when re-run)
- workflow_state.lua: track_yield() checks for existing signal data in DB
  (handles case where signal arrived while orchestrator was down)
- workflow_state.lua: reconstruct wait_for_signal/signal_id from yield data

Recovery model:
1. Orchestrator crashes -> all linked node processes die
2. On restart: RUNNING nodes reset to PENDING
3. Signal yields for reset nodes NOT reconstructed (stale reply_to)
4. Nodes re-run naturally, re-yield with fresh process/channels
5. track_yield() checks DB for existing signal data -> immediate satisfaction
6. Regular yields with children still reconstructed (children may have completed)

All 1887 tests pass (1872 existing + 10 signal unit + 5 recovery integration).
Recovery tests prove: basic signal flow, kill-and-recover, commit backlog,
duplicate signals, and concurrent signal+respawn race conditions.
Signal node: handle nil results from yield (graceful shutdown case) by
returning 0 instead of failing. Yield state is persisted, resumes on restart.

Recovery tests now cover:
- Basic: start/wait/signal, data passthrough, wrong signal_id, empty data
- Kill/recover: signal kill, func kill, pipeline kill, double kill
- Backlog: signal while dead, signal before start (pre-queued)
- Idempotency: duplicates, concurrent race
- Multiple workflows: independent, cross-workflow isolation
- Pipeline: func -> signal -> func normal flow

12/12 individual tests pass (1 suite timeout from total runtime > 30s).
1894 total tests pass across entire dataflow suite.
Recovery:
- Signal node yield with detached recovery across orchestrator restarts
- Cycle node persists state+last_result after child completion, legacy state upgrade shim
- Parallel node durable batch cursor with attempt_id rotation
- Deterministic yield reconstruction (UUID v7 sort, satisfied-yield filter)
- Scheduler loops past SATISFY_YIELD for dead-reply-to yields
- Commit outbox explicit op_id replay barrier
- Migration 04: unique partial indexes for workflow outputs and node results

Agent compaction:
- Token-threshold checkpoint with configurable summarizer function
- Prompt builder filters history by latest compaction marker
- Structured tool results preserved across compaction cut line
- Warning mode (default) vs strict mode for compaction errors
- Summary size cap (default 8192 chars)

BC hardening:
- client:execute returns (result, errors.new({...})) on failure
- Node SDK inputs()/input()/complete() restored to return-pair contract
- Parallel legacy filter/unwrap output shaping preserved
- Func single-named-input wrapping restored
- prompt_builder provider_metadata forwarding restored

Tests: PG 2180/0/1, SQLite 2196/0/1, lint 0 errors.
Normalize item_pipeline/item_steps config, gate output shaping for
fresh runs vs recovery path, preserve v4 filter/unwrap/on_error behavior.
Consistent with client.lua pattern: callers can use either
if err then or if not result.success then.
@msmakouz msmakouz merged commit 7846035 into v4 Apr 15, 2026
@msmakouz msmakouz deleted the feature/signal-node branch April 15, 2026 08:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants