Summary
The workflows/v1 proto package landed in #1ef7dbe. The objectives service already owns objective lifecycle, wake scheduling, provenance, and mutation tracking. Workflows are compositions of objectives — the orchestration logic belongs next to the thing it's coordinating.
What needs to happen
Phase 1: Workflow definitions and versioning
- Implement
CreateDefinition, GetDefinition, ListDefinitions, PublishVersion, ListVersions RPCs in objectives
- Add
workflow_definitions table: id, workspace_id, name, description, latest_version, tags (text[]), created_at, updated_at
- Add
workflow_versions table: id, definition_id, version (serial per definition), steps (jsonb), edges (jsonb), trigger (jsonb), default_retry_policy (jsonb), timeout_seconds, author, created_at
- Validate DAG acyclicity on PublishVersion
- Publish
workflows.changes.definition.create / version.publish CloudEvents
Phase 2: Workflow execution engine
- Implement
StartRun, GetRun, ListRuns, TransitionStep RPCs
- Add
workflow_runs table matching WorkflowRun message
- Add
step_runs table matching StepRun message
- DAG executor: on run start, identify root steps (no incoming edges), create objectives for AGENT_ACTION steps, and transition to RUNNING
- Step completion listener: subscribe to NATS for objective state changes, approval resolutions, governance decisions, CloudEvent arrivals
- On step completion: evaluate outgoing edges, start next steps, accumulate cost_usd
Phase 3: Step type implementations
| Step Type |
Implementation |
AGENT_ACTION |
Create objective via ObjectiveService.Create, optionally delegate via AgentService.Delegate |
APPROVAL_GATE |
Create approval via ApprovalService.RequestApproval, block until resolved |
GOVERNANCE_CHECK |
Call GovernanceService.EvaluateAction, fail on DENY, insert implicit approval on REQUIRE_APPROVAL |
WAIT_FOR_EVENT |
Subscribe to NATS with event_type_pattern + event_subject_pattern filter, block until match or timeout |
BRANCH |
Evaluate condition_expr against prior step outputs, follow matching edge |
PARALLEL |
Fan out to all child steps concurrently, complete when all children complete |
NOTIFICATION |
Send via NotificationService.Send, non-blocking |
HUMAN_TASK |
Create pending step, expose via API/product surface for manual completion |
Phase 4: Lifecycle controls and compensation
- Implement
PauseRun, ResumeRun, CancelRun, CompensateRun RPCs
- Pause: stop starting new steps, let in-progress steps complete
- Cancel: cancel all in-progress objectives, no compensation
- Compensate: walk DAG in reverse topological order, execute compensation_step_id for each completed step
- Publish
workflows.changes.run.pause / .resume / .cancel / .compensate CloudEvents
Phase 5: Triggers
- Event trigger: NATS subscription matching
WorkflowTrigger.event_type + event_subject, apply filter_expr, start run
- Webhook trigger: listen for tap events matching
tap_provider + tap_entity_type + tap_action
- Schedule trigger: cron-based run creation
- Workflow trigger: sub-workflow invocation from parent step
Cross-service integration points
| Service |
Integration |
| agents (registry) |
Resolve agent_id or required_capability for agent steps, create delegations |
| objectives |
Create objective per agent step, listen for state transitions |
| approvals |
Create approval requests for gate steps, listen for decisions |
| governance |
Evaluate actions for governance check steps |
| notifications |
Send notifications for notification steps |
| events (NATS) |
Trigger on CloudEvents, wait for events in steps, publish workflow lifecycle events |
| tap (ensemble-tap) |
Trigger on webhook events from external providers |
| entities |
Resolve entity_id / entity_type for the workflow's primary entity |
| meter |
Attribute step costs, accumulate total_cost_usd on run |
| prompts |
Resolve prompt_name for agent action steps at execution time |
| skills |
Validate skill_id availability for agent action steps |
| identity |
Validate initiator_user_id for manual triggers |
Contract fixtures
Canonical fixtures are already in proto/workflows/v1/testdata/:
publish_version_response.json — full outbound SDR pipeline: research → qualify → governance check → branch by deal value → approval gate → send outreach → notify → wait for reply
get_run_response.json — in-progress workflow with completed research/qualify/governance/branch steps, waiting on manager approval for an $85K deal, with cost tracking and entity linkage
Related
Summary
The
workflows/v1proto package landed in #1ef7dbe. Theobjectivesservice already owns objective lifecycle, wake scheduling, provenance, and mutation tracking. Workflows are compositions of objectives — the orchestration logic belongs next to the thing it's coordinating.What needs to happen
Phase 1: Workflow definitions and versioning
CreateDefinition,GetDefinition,ListDefinitions,PublishVersion,ListVersionsRPCs in objectivesworkflow_definitionstable: id, workspace_id, name, description, latest_version, tags (text[]), created_at, updated_atworkflow_versionstable: id, definition_id, version (serial per definition), steps (jsonb), edges (jsonb), trigger (jsonb), default_retry_policy (jsonb), timeout_seconds, author, created_atworkflows.changes.definition.create/version.publishCloudEventsPhase 2: Workflow execution engine
StartRun,GetRun,ListRuns,TransitionStepRPCsworkflow_runstable matching WorkflowRun messagestep_runstable matching StepRun messagePhase 3: Step type implementations
AGENT_ACTIONObjectiveService.Create, optionally delegate viaAgentService.DelegateAPPROVAL_GATEApprovalService.RequestApproval, block until resolvedGOVERNANCE_CHECKGovernanceService.EvaluateAction, fail on DENY, insert implicit approval on REQUIRE_APPROVALWAIT_FOR_EVENTevent_type_pattern+event_subject_patternfilter, block until match or timeoutBRANCHcondition_expragainst prior step outputs, follow matching edgePARALLELNOTIFICATIONNotificationService.Send, non-blockingHUMAN_TASKPhase 4: Lifecycle controls and compensation
PauseRun,ResumeRun,CancelRun,CompensateRunRPCsworkflows.changes.run.pause/.resume/.cancel/.compensateCloudEventsPhase 5: Triggers
WorkflowTrigger.event_type+event_subject, applyfilter_expr, start runtap_provider+tap_entity_type+tap_actionCross-service integration points
agent_idorrequired_capabilityfor agent steps, create delegationsentity_id/entity_typefor the workflow's primary entitytotal_cost_usdon runprompt_namefor agent action steps at execution timeskill_idavailability for agent action stepsinitiator_user_idfor manual triggersContract fixtures
Canonical fixtures are already in
proto/workflows/v1/testdata/:publish_version_response.json— full outbound SDR pipeline: research → qualify → governance check → branch by deal value → approval gate → send outreach → notify → wait for replyget_run_response.json— in-progress workflow with completed research/qualify/governance/branch steps, waiting on manager approval for an $85K deal, with cost tracking and entity linkageRelated