Skip to content

feat(notifications): user-defined domain notifications (type: notification)#213

Draft
PolyphonyRequiem wants to merge 1 commit into
microsoft:mainfrom
PolyphonyRequiem:feat/notifications
Draft

feat(notifications): user-defined domain notifications (type: notification)#213
PolyphonyRequiem wants to merge 1 commit into
microsoft:mainfrom
PolyphonyRequiem:feat/notifications

Conversation

@PolyphonyRequiem
Copy link
Copy Markdown
Member

@PolyphonyRequiem PolyphonyRequiem commented May 19, 2026

Summary

Adds a new type: notification step that lets workflow authors publish typed, declared domain events to a dedicated JSONL stream for external tooling to hook off.

Notifications are distinct from execution events (agent_started, route_evaluated, ...): execution events describe what the runtime is doing; notifications describe what the workflow is asserting to the outside world.

Notifications are a visibility primitive, not a control-flow primitive. They do not block, do not wait for an ack, and do not gate routing. If a decision needs to flow back into the workflow, use human_gate.

YAML surface

workflow:
  notifications:
    namespace: polyphony.feature_pr   # defaults to slugified workflow name
    correlation: [apex_id]            # inherited by sub-workflows
    types:
      pr_ready:
        version: 1
        payload:
          pr_url: { type: string }
          pr_id:  { type: number }

agents:
  - name: announce_ready
    type: notification
    notification: pr_ready
    payload:
      pr_url: "{{ open_pr.output.url }}"
      pr_id:  "{{ open_pr.output.id }}"
    routes:
      - to: next_step       # emission is independent of routing

Envelope shape

  • emission_id = <run_id>:<dotted_step_path>:<iteration> — stable across resume/replay so the first downstream consumer can dedupe.
  • schema_id = <namespace>.<type>@<version> — consumers MUST pin on this, not on notification_type alone.
  • correlation — first-class field; auto-merged from declared keys and inherited from the parent workflow (parent wins on collision so the upstream trail survives deep nesting).
  • payload — Jinja2-rendered then type-validated against the declared OutputField schema.

Delivery (v1)

Every emission flows through WorkflowEventEmitter and is written by NotificationLogSubscriber to a dedicated …notifications.jsonl file next to the existing event log (path printed at end-of-run).

No webhooks, shell hooks, or model-driven emission in v1.

What's in this PR

  • src/conductor/config/schema.pyNotificationsConfig + NotificationTypeDef; notification and payload fields on AgentDef; strict positive/negative validators.
  • src/conductor/config/validator.py — cross-references undeclared types, payload field mismatches, unknown correlation keys.
  • src/conductor/executor/notification.py — pure envelope builder (rendering, coercion, type validation, emission-id construction).
  • src/conductor/engine/workflow.py — dispatch branch + correlation snapshot inherited into sub-workflows.
  • src/conductor/engine/event_log.pyNotificationLogSubscriber filters to notification events only.
  • src/conductor/cli/run.py — wires the subscriber in both run and resume paths.
  • examples/notifications.yaml — runnable example with namespace, correlation, mid-workflow emission.
  • AGENTS.md — full design + schema-version-bump policy + "visibility not control flow" callout.
  • Tests: tests/test_config/test_notifications.py + tests/test_executor/test_notification.py (18 focused tests).

Schema-version-bump policy (documented, not enforced)

  • Adding an optional field is non-breaking — do not bump version.
  • Removing/renaming a field, changing a field's type, or making an optional field required is breaking — bump version and emit under the new schema_id.
  • During a transition a workflow MAY emit both pr_ready@1 and pr_ready@2 from adjacent steps.
  • Consumers MUST pin on schema_id.

Validation

Undeclared type, payload field mismatch, unknown correlation key, or a type mismatch on a rendered value all fail the workflow loudly with ValidationError. The contract is the whole point.

Testing

  • uv run ruff check — clean
  • uv run ruff format --check — clean
  • uv run pytest tests/test_config tests/test_executor tests/test_engine tests/test_cli -m "not performance" — 1692 passed, 5 skipped
  • uv run conductor validate examples/notifications.yaml — clean
  • All other examples/*.yaml still validate clean.

Deliberately deferred

  • Edge-attached notifications (notify: on RouteDef) — possible v1.5 sugar reusing the same envelope.
  • Webhooks / shell hooks for delivery.
  • MCP tool for model-driven emission.
  • Ack / response semantics (use human_gate instead).

…ification)

Introduces a new `type: notification` step that lets workflow authors
publish typed, declared *domain events* to a dedicated JSONL stream for
external tooling to hook off. Distinct from execution events
(`agent_started`, `route_evaluated`, ...): execution events describe
what the runtime is doing; notifications describe what the workflow is
asserting to the outside world.

Notifications are a visibility primitive, not a control-flow primitive.
They do not block, do not wait for an ack, and do not gate routing. If a
decision needs to flow back into the workflow, use `human_gate`.

Workflow-level config (`workflow.notifications`):
- `namespace` -- dotted-identifier (defaults to slugified workflow name)
- `correlation` -- workflow input keys auto-surfaced on every emission;
  inherited by sub-workflows (parent wins on collision so the upstream
  trail survives deep nesting)
- `types` -- per-type `version` + `payload` schema (reusing
  `OutputField`)

Each emission ships a fixed envelope:
- `emission_id` = `<run_id>:<dotted_step_path>:<iteration>` -- stable
  across resume/replay so the first downstream consumer can dedupe
- `schema_id` = `<namespace>.<type>@<version>` -- consumers MUST pin
  on this, not on `notification_type` alone
- `correlation` -- first-class, not buried in metadata
- `payload` -- Jinja2-rendered then type-validated

Delivery (v1): rides `WorkflowEventEmitter`;
`NotificationLogSubscriber` writes a dedicated
`...notifications.jsonl` next to the existing event log. No webhooks,
shell hooks, or model-driven emission in v1.

Schema-version-bump policy documented in AGENTS.md (additive = no bump;
remove/rename/retype/required = bump; consumers MUST pin on schema_id).

Validation failures (undeclared type, payload field mismatch, unknown
correlation key, rendered-value type mismatch) fail the workflow loudly
with `ValidationError` -- the contract is the point.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 70.27027% with 88 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (main@75b01b5). Learn more about missing BASE report.

Files with missing lines Patch % Lines
src/conductor/engine/workflow.py 33.33% 30 Missing ⚠️
src/conductor/executor/notification.py 68.47% 29 Missing ⚠️
src/conductor/config/schema.py 77.64% 19 Missing ⚠️
src/conductor/engine/event_log.py 68.96% 9 Missing ⚠️
src/conductor/config/validator.py 96.77% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main     #213   +/-   ##
=======================================
  Coverage        ?   87.67%           
=======================================
  Files           ?       61           
  Lines           ?     9943           
  Branches        ?        0           
=======================================
  Hits            ?     8718           
  Misses          ?     1225           
  Partials        ?        0           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Collaborator

@jrob5756 jrob5756 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work. I have a few questions/comments below

continue

# Handle notification steps (fire-and-forget visibility)
if agent.type == "notification":
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parallel / for-each containment is undefined. This dispatch lives only in the sequential path; behavior inside parallel: or for_each: is unspecified and get_agent_execution_count won't give per-slot iteration counts (breaking emission_id uniqueness).

Either disallow at validation time, or thread emission through the group dispatchers + add tests.

# notifications block exists when a notification step
# is present, but assert defensively at runtime so
# the type narrowing is explicit for callers.
assert notif_config is not None # noqa: S101
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert is stripped under python -O — this becomes a no-op and the next line dereferences None. Make it an explicit raise.

# The domain notification itself — this is what
# external tooling tails the notifications.jsonl
# subscriber for.
self._emit("notification", envelope)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dashboard rendering for the new event types is missing. Per your discussion with Daniel about a bottom-pane tab at the workflow level — worth landing alongside this PR (or as an immediate follow-up) so --web users actually see emissions. Currently the four new event types (notification_started, notification, notification_completed, notification_failed) and the new step type aren't handled in workflow-store.ts or the Cytoscape view.

# Store an empty dict so routes can fire without
# exposing notification internals as agent output
# (consistent with the "no response flow" contract).
self.context.store(agent.name, {})
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing {} means {{ announce_ready.output.x }} downstream silently renders empty under accumulate mode — quiet footgun for authors who confuse a notification with a regular agent. Either document the behavior in AGENTS.md, or store a sentinel that lets templates fail loudly.

Comment thread src/conductor/cli/run.py
# run_id stem with the event log so the two files line up.
from conductor.engine.event_log import NotificationLogSubscriber

notification_log_subscriber = NotificationLogSubscriber(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick confirmation: on resume the dashboard already replays the main event log (dashboard.replay_events_from_jsonl at line 1823), and notification events flow through the same emitter — so old emissions will render in the dashboard on resume. The separate .notifications.jsonl is fresh per resume, which is fine for external tailers given the stable emission_id dedup. No action needed; flagging only to confirm the behavior is intentional.

return value
try:
return json.loads(value)
except (json.JSONDecodeError, ValueError):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent fallback to the raw string means a bad template (e.g. "3.14abc" for a number field) fails one frame later with "got str" rather than "failed to coerce 'X' as number". Better author UX to raise here with the offending value.

"source_agent": agent.name,
"subworkflow_path": list(subworkflow_path),
"correlation": dict(correlation),
"workflow_metadata": dict(workflow_metadata),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: workflow_metadata feels redundant in the envelope. Consumers already get run_id + workflow + subworkflow_path and can join against the event log if they need more. Splatting the whole metadata dict into every emission bloats the wire format and couples the notification schema to whatever ad-hoc keys workflows happen to use. Consider dropping it (or opt-in allowlist).

effort: high
"""

notification: str | None = None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: type: notification + notification: pr_ready reads redundant. Siblings use scriptcommand, human_gatedialog. Something like emit: or event: would parse more naturally — easier to rename now than after it ships.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants