Skip to content

workflows/v1: adopt WorkflowService in objectives #39

@haasonsaas

Description

@haasonsaas

Summary

The workflows/v1 proto package landed in #1ef7dbe. The objectives service already owns objective lifecycle, wake scheduling, provenance, and mutation tracking. Workflows are compositions of objectives — the orchestration logic belongs next to the thing it's coordinating.

What needs to happen

Phase 1: Workflow definitions and versioning

  • Implement CreateDefinition, GetDefinition, ListDefinitions, PublishVersion, ListVersions RPCs in objectives
  • Add workflow_definitions table: id, workspace_id, name, description, latest_version, tags (text[]), created_at, updated_at
  • Add workflow_versions table: id, definition_id, version (serial per definition), steps (jsonb), edges (jsonb), trigger (jsonb), default_retry_policy (jsonb), timeout_seconds, author, created_at
  • Validate DAG acyclicity on PublishVersion
  • Publish workflows.changes.definition.create / version.publish CloudEvents

Phase 2: Workflow execution engine

  • Implement StartRun, GetRun, ListRuns, TransitionStep RPCs
  • Add workflow_runs table matching WorkflowRun message
  • Add step_runs table matching StepRun message
  • DAG executor: on run start, identify root steps (no incoming edges), create objectives for AGENT_ACTION steps, and transition to RUNNING
  • Step completion listener: subscribe to NATS for objective state changes, approval resolutions, governance decisions, CloudEvent arrivals
  • On step completion: evaluate outgoing edges, start next steps, accumulate cost_usd

Phase 3: Step type implementations

Step Type Implementation
AGENT_ACTION Create objective via ObjectiveService.Create, optionally delegate via AgentService.Delegate
APPROVAL_GATE Create approval via ApprovalService.RequestApproval, block until resolved
GOVERNANCE_CHECK Call GovernanceService.EvaluateAction, fail on DENY, insert implicit approval on REQUIRE_APPROVAL
WAIT_FOR_EVENT Subscribe to NATS with event_type_pattern + event_subject_pattern filter, block until match or timeout
BRANCH Evaluate condition_expr against prior step outputs, follow matching edge
PARALLEL Fan out to all child steps concurrently, complete when all children complete
NOTIFICATION Send via NotificationService.Send, non-blocking
HUMAN_TASK Create pending step, expose via API/product surface for manual completion

Phase 4: Lifecycle controls and compensation

  • Implement PauseRun, ResumeRun, CancelRun, CompensateRun RPCs
  • Pause: stop starting new steps, let in-progress steps complete
  • Cancel: cancel all in-progress objectives, no compensation
  • Compensate: walk DAG in reverse topological order, execute compensation_step_id for each completed step
  • Publish workflows.changes.run.pause / .resume / .cancel / .compensate CloudEvents

Phase 5: Triggers

  • Event trigger: NATS subscription matching WorkflowTrigger.event_type + event_subject, apply filter_expr, start run
  • Webhook trigger: listen for tap events matching tap_provider + tap_entity_type + tap_action
  • Schedule trigger: cron-based run creation
  • Workflow trigger: sub-workflow invocation from parent step

Cross-service integration points

Service Integration
agents (registry) Resolve agent_id or required_capability for agent steps, create delegations
objectives Create objective per agent step, listen for state transitions
approvals Create approval requests for gate steps, listen for decisions
governance Evaluate actions for governance check steps
notifications Send notifications for notification steps
events (NATS) Trigger on CloudEvents, wait for events in steps, publish workflow lifecycle events
tap (ensemble-tap) Trigger on webhook events from external providers
entities Resolve entity_id / entity_type for the workflow's primary entity
meter Attribute step costs, accumulate total_cost_usd on run
prompts Resolve prompt_name for agent action steps at execution time
skills Validate skill_id availability for agent action steps
identity Validate initiator_user_id for manual triggers

Contract fixtures

Canonical fixtures are already in proto/workflows/v1/testdata/:

  • publish_version_response.json — full outbound SDR pipeline: research → qualify → governance check → branch by deal value → approval gate → send outreach → notify → wait for reply
  • get_run_response.json — in-progress workflow with completed research/qualify/governance/branch steps, waiting on manager approval for an $85K deal, with cost tracking and entity linkage

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions