feat: enhance workflow interaction handling#474
Conversation
- Added support for delivering workflow signals and events, allowing real-time interaction with running workflows. - Implemented interaction snapshots to store and manage workflow interactions by batch ID. - Introduced query capabilities for workflow states and current steps, improving the ability to inspect workflow status. - Enhanced tests to cover new interaction features, ensuring robustness and correctness of the workflow system. - Updated documentation to reflect the new capabilities and usage of signals and events in workflows. closes: #351
There was a problem hiding this comment.
Pull request overview
This PR adds first-class workflow interaction primitives (signals/events) and explicit query support to improve live inspection and operator interaction with running workflows, primarily implemented in core/karya and the in-memory queue store.
Changes:
- Introduces workflow interaction snapshots (signal/event deliveries) and wires them into workflow snapshots and step readiness gating.
- Adds explicit workflow queries (
state,current-step,current-steps) with typedQueryResult. - Extends the in-memory queue store and base queue-store interface with query + interaction delivery APIs, plus tests and updated RBS.
Reviewed changes
Copilot reviewed 31 out of 31 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/pages/workflows/signals.md | Updates workflow interaction docs wording to include operator checkpoints. |
| core/karya/lib/karya/workflow.rb | Requires new workflow interaction/query classes; extends define.step DSL with wait_for_signal / wait_for_event. |
| core/karya/lib/karya/workflow/step.rb | Adds optional interaction gates on steps and validates mutual exclusivity. |
| core/karya/lib/karya/workflow/step_snapshot.rb | Adds interaction gating metadata to step snapshots and incorporates it into ready? evaluation. |
| core/karya/lib/karya/workflow/snapshot.rb | Adds interaction lists to snapshots and derives step readiness/state using interaction deliveries. |
| core/karya/lib/karya/workflow/interaction_snapshot.rb | Adds immutable interaction delivery snapshot with JSON-compatible payload normalization. |
| core/karya/lib/karya/workflow/query_result.rb | Adds immutable typed query result for workflow queries. |
| core/karya/lib/karya/queue_store/base.rb | Adds abstract queue-store APIs for workflow query and interaction delivery. |
| core/karya/lib/karya/queue_store/bulk_mutation_report.rb | Extends allowed bulk mutation actions to include workflow interaction deliveries. |
| core/karya/lib/karya/queue_store/in_memory/internal.rb | Requires new internal workflow query / interaction support files. |
| core/karya/lib/karya/queue_store/in_memory/internal/workflow_support.rb | Wires interaction requirements into workflow registration and includes interaction gating in readiness checks. |
| core/karya/lib/karya/queue_store/in_memory/internal/workflow_interactions_support.rb | Implements in-memory workflow query and signal/event delivery APIs. |
| core/karya/lib/karya/queue_store/in_memory/internal/workflow_query.rb | Implements query evaluation against workflow snapshots (state/current step(s)). |
| core/karya/lib/karya/queue_store/in_memory/internal/workflow_interaction_requirements.rb | Maps step interaction gates to concrete job-id requirements. |
| core/karya/lib/karya/queue_store/in_memory/internal/store_state.rb | Adds storage for per-batch workflow interaction inboxes and cleanup on pruning. |
| core/karya/spec/karya/workflow/step_spec.rb | Adds step-level tests for interaction gate normalization and invalid configurations. |
| core/karya/spec/karya/workflow/step_snapshot_spec.rb | Adds tests for interaction-gated readiness and validation of interaction metadata. |
| core/karya/spec/karya/workflow/snapshot_spec.rb | Adds tests for snapshot interaction visibility, gating behavior, and validation. |
| core/karya/spec/karya/workflow/query_result_spec.rb | Adds tests for query normalization, allowed values, and timestamp validation. |
| core/karya/spec/karya/workflow/interaction_snapshot_spec.rb | Adds tests for payload deep-freeze/normalization and invalid payload/kind handling. |
| core/karya/spec/karya/queue_store_base_spec.rb | Adds base-spec assertions for new abstract queue store methods. |
| core/karya/spec/karya/queue_store/in_memory_workflow_spec.rb | Adds in-memory integration tests for interaction delivery and workflow querying. |
| core/karya/spec/karya/queue_store/in_memory/internal/workflow_query_spec.rb | Adds focused internal unit tests for workflow query resolution. |
| core/karya/spec/karya/queue_store/in_memory/internal/store_state_spec.rb | Adds tests for interaction inbox storage and pruning cleanup. |
| core/karya/spec/karya/queue_store/bulk_mutation_report_spec.rb | Extends action acceptance tests for new interaction-delivery actions. |
| core/karya/sig/karya.rbs | Adds new shared types for interactions and workflow queries. |
| core/karya/sig/karya/workflow.rbs | Adds RBS for new workflow interaction/query classes and extends snapshot/step signatures. |
| core/karya/sig/karya/queue_store/base.rbs | Adds signatures for new queue-store workflow query/interaction APIs. |
| core/karya/sig/karya/queue_store/in_memory.rbs | Adds signatures for in-memory queue store workflow query/interaction APIs. |
| core/karya/sig/karya/queue_store/in_memory/internal/workflow_support.rbs | Extends internal workflow support signatures for queries/interactions. |
| core/karya/sig/karya/queue_store/in_memory/internal/store_state.rbs | Extends store-state signatures for interaction inbox storage. |
- Added support for registering and validating workflow interactions. - Enhanced interaction storage to handle signals and events. - Introduced validation methods to ensure interaction requirements are met. - Updated tests to cover new interaction handling scenarios.
- Added handling for blocked steps in workflow queries to improve step resolution logic. - Refactored interaction requirement validation to raise specific errors for invalid kinds. - Updated tests to cover new scenarios for blocked steps and invalid interaction requirements.
- Refactored methods in StoreState to improve interaction management. - Updated `for_batch` and `delete_by_batch` methods to handle interactions more efficiently. - Simplified the registration of interactions by removing unnecessary complexity. - Added a new test to ensure repeated deliveries of the same interaction are preserved in snapshots.
- Updated the logic in `current_step_ids` to filter out steps that are blocked by their prerequisites. - Added a new test case to ensure that dependency-blocked descendants are excluded from the current blocked steps.
- Updated the `kind` method to use the new `Kind` class for normalization. - Introduced a new `Kind` class to encapsulate kind validation and conversion. - Modified tests to accept string input for kind and added checks for invalid types.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 31 out of 31 changed files in this pull request and generated 7 comments.
Comments suppressed due to low confidence (1)
docs/pages/workflows/signals.md:18
- This page now claims support for “operator checkpoints” (including “pause, resume, and approval checkpoints”), but there doesn’t appear to be any corresponding workflow pause/resume/checkpoint API in the codebase. Unless that capability exists elsewhere, this is likely overstating current behavior; consider removing the checkpoints references or linking to the concrete API/docs that implement them.
- Refactored interaction identifier normalization to improve error handling. - Updated interaction requirements to ensure presence validation for names. - Adjusted type definitions for interaction kinds and workflow queries to enhance clarity. - Added tests to validate new error conditions for empty names in interactions.
- Introduced bounded interaction handling in WorkflowInteractions to limit the number of interactions per batch. - Added Inbox class to manage interactions with a maximum size constraint. - Enhanced InteractionSnapshot to validate payload size, raising errors for oversized payloads. - Updated specs to ensure correct behavior for interaction retention and payload validation.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 31 out of 31 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
docs/pages/workflows/signals.md:18
- The page now claims support for “operator checkpoints” and lists “pause, resume, and approval checkpoints”, but this PR (and the current core implementation) only introduces signals/events delivery and query support. Either remove the checkpoint references here or add/link the actual checkpoint feature documentation once it exists, to avoid documenting unsupported behavior.
- Updated the type of `@value` in `QueryResult` to `Karya::workflow_query_value` for better clarity. - Introduced `DeliveryIndex` and `MatchingJobIndex` classes to improve the organization of interaction handling. - Added `ReceivedAtByJobId` class to manage delivery and matching job indices effectively. - Refactored methods to enhance readability and maintainability of interaction-related logic.
- Introduced `includes?` method to check if a specific interaction was delivered based on kind and name. - Refactored `for_batch` method to utilize `with_inbox` for cleaner code and fallback handling. - Updated `delete_by_batch` to use `with_inbox` for improved deletion logic. - Enhanced `Inbox` class to manage interaction counts and provide a method for checking interaction presence. - Added tests to ensure correct behavior of interaction delivery checks and interaction snapshot validations.
- Added methods to track the received timestamps for workflow interactions. - Introduced `received_at_for` and `workflow_interaction_received_at` to retrieve timestamps. - Updated internal state management to store interaction timestamps separately. - Enhanced specs to validate the new functionality and ensure correct behavior.
- Added WORKFLOW_STATES constant to QueryResult for better state validation. - Updated normalize_state method to ensure value is a valid workflow state. - Enhanced tests to validate error handling for unsupported workflow states. - Improved documentation formatting for clarity on workflow signals.
- Changed the variable name from `@snapshot` to `@to_a` for clarity. - This change improves the readability of the code by using a more descriptive name for the array of interaction snapshots.
- Added `interaction_supported_keys` to manage interaction types. - Introduced `configure` method for `workflow_interactions` to set supported keys. - Updated `WorkflowRegistration` to utilize the new build method for better initialization. - Enhanced tests to verify interaction tracking based on declared requirements.
- Normalize supported keys input in the configure method to accept both Hash and Enumerable types. - Update the supported? method to utilize interaction_supported_keys for better performance and clarity. - Add a new test to verify that supported interaction keys can be configured correctly and are recognized during interaction delivery.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 31 out of 31 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
docs/pages/workflows/signals.md:18
- The docs now claim workflow interaction supports “operator checkpoints” and lists “pause, resume, and approval checkpoints”, but there doesn’t appear to be any workflow pause/resume/checkpoint API in this PR or elsewhere in the codebase. This reads like a supported feature and could mislead users; either document how checkpoints map onto the new signal/event/query surfaces, or remove the checkpoint references until the capability exists.
- Updated `configure` method to accept an additional parameter type for `supported_keys`, allowing for more flexible input. - Added private visibility to `fetch_workflow_registration` method for better encapsulation. - Cleaned up private method declarations for improved code organization.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 31 out of 31 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
docs/pages/workflows/signals.md:18
- The doc now states workflows support "operator checkpoints" and lists "pause, resume, and approval checkpoints", but this PR’s runtime changes appear to only add signals/events delivery + queries. If checkpoints aren’t implemented yet, consider removing/qualifying this wording to avoid implying supported behavior that doesn’t exist.
- Refactored the normalization of execution identifiers in InteractionSnapshot and Snapshot classes to directly call Workflow methods. - Removed redundant local normalization methods to streamline the code and improve maintainability. - Ensured consistent handling of interaction names across different workflow components.
closes: #351