diff --git a/core/karya/lib/karya/queue_store/base.rb b/core/karya/lib/karya/queue_store/base.rb index 0a46b92..86fcb0b 100644 --- a/core/karya/lib/karya/queue_store/base.rb +++ b/core/karya/lib/karya/queue_store/base.rb @@ -90,6 +90,32 @@ def workflow_snapshot(batch_id:, now:) raise NotImplementedError, "#{self.class} must implement ##{__method__}" end + # Execute one explicit workflow state query. + def query_workflow(batch_id:, query:, now:) + _batch_id = batch_id + _query = query + _now = now + raise NotImplementedError, "#{self.class} must implement ##{__method__}" + end + + # Persist one workflow signal delivery for a nonterminal workflow batch. + def deliver_workflow_signal(batch_id:, signal:, payload:, now:) + _batch_id = batch_id + _signal = signal + _payload = payload + _now = now + raise NotImplementedError, "#{self.class} must implement ##{__method__}" + end + + # Persist one workflow external event delivery for a nonterminal workflow batch. + def deliver_workflow_event(batch_id:, event:, payload:, now:) + _batch_id = batch_id + _event = event + _payload = payload + _now = now + raise NotImplementedError, "#{self.class} must implement ##{__method__}" + end + # Enqueue a child workflow batch for one declared parent child step. def enqueue_child_workflow( parent_batch_id:, diff --git a/core/karya/lib/karya/queue_store/bulk_mutation_report.rb b/core/karya/lib/karya/queue_store/bulk_mutation_report.rb index 2c6bca8..16406bf 100644 --- a/core/karya/lib/karya/queue_store/bulk_mutation_report.rb +++ b/core/karya/lib/karya/queue_store/bulk_mutation_report.rb @@ -17,6 +17,8 @@ class BulkMutationReport replay_dead_letter_jobs retry_dead_letter_jobs discard_dead_letter_jobs + deliver_workflow_signal + deliver_workflow_event enqueue_child_workflow rollback_workflow sync_child_workflows @@ -130,7 +132,8 @@ def skipped_reason_error_message def action_error_message 'action must be one of :enqueue_many, :retry_jobs, :cancel_jobs, :dead_letter_jobs, ' \ - ':replay_dead_letter_jobs, :retry_dead_letter_jobs, :discard_dead_letter_jobs, :enqueue_child_workflow, :rollback_workflow, ' \ + ':replay_dead_letter_jobs, :retry_dead_letter_jobs, :discard_dead_letter_jobs, :deliver_workflow_signal, ' \ + ':deliver_workflow_event, :enqueue_child_workflow, :rollback_workflow, ' \ ':retry_workflow_steps, :dead_letter_workflow_steps, :replay_workflow_steps, ' \ ':retry_dead_letter_workflow_steps, :discard_workflow_steps, or :sync_child_workflows' end diff --git a/core/karya/lib/karya/queue_store/in_memory/internal.rb b/core/karya/lib/karya/queue_store/in_memory/internal.rb index 6213d52..fc356b4 100644 --- a/core/karya/lib/karya/queue_store/in_memory/internal.rb +++ b/core/karya/lib/karya/queue_store/in_memory/internal.rb @@ -27,4 +27,7 @@ require_relative 'internal/store_state' require_relative 'internal/uniqueness_support' require_relative 'internal/workflow_child_ids' +require_relative 'internal/workflow_interaction_requirements' +require_relative 'internal/workflow_interactions_support' +require_relative 'internal/workflow_query' require_relative 'internal/workflow_support' diff --git a/core/karya/lib/karya/queue_store/in_memory/internal/store_state.rb b/core/karya/lib/karya/queue_store/in_memory/internal/store_state.rb index f17657b..dc63772 100644 --- a/core/karya/lib/karya/queue_store/in_memory/internal/store_state.rb +++ b/core/karya/lib/karya/queue_store/in_memory/internal/store_state.rb @@ -12,9 +12,9 @@ module Internal # Internal mutable state for the single-process queue store. class StoreState MAX_TRACKED_FAIR_QUEUE_LISTS = 128 - attr_reader :executions_by_token, :batches_by_id, + :batch_id_by_job_id, :breaker_failures_by_scope, :breaker_states_by_scope, :execution_tokens_in_order, @@ -34,6 +34,7 @@ class StoreState :stuck_job_recoveries_by_id, :workflow_children, :workflow_dependency_job_ids_by_job_id, + :workflow_interactions, :workflow_rollback_batch_ids, :workflow_registrations_by_batch_id, :workflow_rollbacks_by_batch_id @@ -43,12 +44,32 @@ class StoreState :workflow_id, :step_job_ids, :dependency_job_ids_by_job_id, + :interaction_requirements_by_job_id, + :interaction_supported_keys, :compensation_jobs_by_step_id, :child_workflow_ids_by_step_id - ) + ) do + def self.build( + workflow_id:, + step_job_ids:, + dependency_job_ids_by_job_id:, + interaction_requirements_by_job_id:, + compensation_jobs_by_step_id:, + child_workflow_ids_by_step_id: + ) + new( + workflow_id, + step_job_ids.dup.freeze, + dependency_job_ids_by_job_id.transform_values { |dependency_job_ids| dependency_job_ids.dup.freeze }.freeze, + interaction_requirements_by_job_id.transform_values { |requirement| requirement.dup.freeze }.freeze, + interaction_requirements_by_job_id.values.to_h { |requirement| [[requirement.fetch(:kind), requirement.fetch(:name)], true] }.freeze, + compensation_jobs_by_step_id.dup.freeze, + child_workflow_ids_by_step_id.dup.freeze + ) + end + end # Immutable owner-local rollback metadata for one workflow batch. WorkflowRollback = Struct.new(:batch_id, :rollback_batch_id, :reason, :requested_at, :compensation_job_ids) - # Owner-local child workflow relationship registry. class WorkflowChildren # Immutable owner-local child workflow relationship metadata. @@ -156,6 +177,116 @@ def delete_relationship(relationship, remove_parent_batch: true) end end + # Owner-local workflow interaction inbox keyed by workflow batch id. + class WorkflowInteractions + EMPTY = [].freeze + MAX_INTERACTIONS_PER_BATCH = 100 + private_constant :EMPTY, :MAX_INTERACTIONS_PER_BATCH + + def initialize + @by_batch_id = {} + @max_interactions_per_batch = MAX_INTERACTIONS_PER_BATCH + end + + def for_batch(batch_id) + with_inbox(batch_id, fallback: EMPTY, &:to_a) + end + + def includes?(batch_id:, kind:, name:) + with_inbox(batch_id, fallback: false) { |inbox| inbox.includes?(kind:, name:) } + end + + def received_at_for(batch_id:, kind:, name:) + with_inbox(batch_id, fallback: nil) { |inbox| inbox.received_at_for(kind:, name:) } + end + + def register(batch_id:, interaction:) + current_inbox(batch_id).append(interaction).to_a + end + + def configure(batch_id:, supported_keys:) + current_inbox(batch_id).configure(supported_keys:) + end + + def delete_by_batch(batch_id) + with_inbox(batch_id, fallback: EMPTY, delete: true, &:to_a) + end + + private + + attr_reader :max_interactions_per_batch + + def current_inbox(batch_id) + @by_batch_id[batch_id] ||= Inbox.new(max_size: max_interactions_per_batch) + end + + def with_inbox(batch_id, fallback:, delete: false) + inbox = delete ? @by_batch_id.delete(batch_id) : @by_batch_id[batch_id] + return fallback unless inbox + + yield inbox + end + + # Owner-local bounded interaction buffer for one workflow batch. + class Inbox + def initialize(max_size:) + @max_size = max_size + @interactions = [] + @to_a = EMPTY + @received_at_by_key = {} + @supported_keys = {}.freeze + end + + def append(interaction) + interactions << interaction + track([interaction.kind, interaction.name], interaction.received_at) + interactions.shift if interactions.length > max_size + @to_a = nil + self + end + + def configure(supported_keys:) + normalized_supported_keys = + if supported_keys.is_a?(Hash) + supported_keys.keys.to_h { |key| [key, true] } + else + supported_keys.to_h { |key| [key, true] } + end + + @supported_keys = normalized_supported_keys.freeze + rebuild_received_at_index + self + end + + def to_a + @to_a ||= interactions.dup.freeze + end + + def includes?(kind:, name:) + received_at_by_key.key?([kind, name]) + end + + def received_at_for(kind:, name:) + received_at_by_key[[kind, name]] + end + + private + + attr_reader :interactions, :max_size, :received_at_by_key, :supported_keys + + def rebuild_received_at_index + received_at_by_key.clear + interactions.each { |interaction| track([interaction.kind, interaction.name], interaction.received_at) } + end + + def track(key, received_at) + return unless supported_keys.key?(key) + + received_at_by_key[key] = received_at + end + end + end + # Decides whether a terminal child batch must remain because its parent is still active. class ChildBatchRetention def initialize(batches_by_id:, workflow_children:, terminal_batch:) @@ -237,6 +368,10 @@ def workflow_children workflow_indexes.fetch(:workflow_children) end + def workflow_interactions + workflow_indexes.fetch(:workflow_interactions) + end + def workflow_dependency_job_ids_by_job_id workflow_indexes.fetch(:workflow_dependency_job_ids_by_job_id) end @@ -263,6 +398,7 @@ def cleanup_batch(batch_id:, batch:) }, workflow_indexes: { workflow_children:, + workflow_interactions:, workflow_rollback_batch_ids:, workflow_registrations_by_batch_id:, workflow_rollbacks_by_batch_id: @@ -279,16 +415,19 @@ def register_workflow( step_job_ids:, dependency_job_ids_by_job_id:, compensation_jobs_by_step_id:, + interaction_requirements_by_job_id: {}, child_workflow_ids_by_step_id: {} ) - registration = WorkflowRegistration.new( - workflow_id, - step_job_ids.dup.freeze, - dependency_job_ids_by_job_id.transform_values { |dependency_job_ids| dependency_job_ids.dup.freeze }.freeze, - compensation_jobs_by_step_id.dup.freeze, - child_workflow_ids_by_step_id.dup.freeze + registration = WorkflowRegistration.build( + workflow_id:, + step_job_ids:, + dependency_job_ids_by_job_id:, + interaction_requirements_by_job_id:, + compensation_jobs_by_step_id:, + child_workflow_ids_by_step_id: ).freeze workflow_registrations_by_batch_id[batch_id] = registration + workflow_interactions.configure(batch_id:, supported_keys: registration.interaction_supported_keys) child_workflow_ids_by_step_id.each do |step_id, child_workflow_id| workflow_children.register_expected_child(step_job_ids.fetch(step_id), child_workflow_id) end @@ -301,10 +440,26 @@ def register_workflow_dependencies(dependency_job_ids_by_job_id) ) end + def register_workflow_interaction(batch_id:, interaction:) + workflow_interactions.register(batch_id:, interaction:) + end + def workflow_dependency_job_ids_for(job_id) workflow_dependency_job_ids_by_job_id[job_id] end + def workflow_interactions_for(batch_id) + workflow_interactions.for_batch(batch_id) + end + + def workflow_interaction_delivered?(batch_id:, kind:, name:) + workflow_interactions.includes?(batch_id:, kind:, name:) + end + + def workflow_interaction_received_at(batch_id:, kind:, name:) + workflow_interactions.received_at_for(batch_id:, kind:, name:) + end + def register_workflow_rollback(batch_id:, rollback_batch_id:, reason:, requested_at:, compensation_job_ids:) workflow_rollback_batch_ids[rollback_batch_id] = true workflow_rollbacks_by_batch_id[batch_id] = WorkflowRollback.new( @@ -322,6 +477,7 @@ def register_workflow_rollback(batch_id:, rollback_batch_id:, reason:, requested private_constant :ChildBatchRetention, :TerminalBatchPruner, :WorkflowChildren, + :WorkflowInteractions, :WorkflowMetadata, :WorkflowRegistration, :WorkflowRollback @@ -351,11 +507,15 @@ def initialize(expired_tombstone_limit:) @stuck_job_recoveries_by_id = {} @terminal_batch_ids_index = {} @terminal_batch_ids_in_order = [] - @workflow_children = WorkflowChildren.new - @workflow_dependency_job_ids_by_job_id = {} - @workflow_rollback_batch_ids = {} - @workflow_registrations_by_batch_id = {} - @workflow_rollbacks_by_batch_id = {} + workflow_state = { + workflow_children: WorkflowChildren.new, + workflow_dependency_job_ids_by_job_id: {}, + workflow_interactions: WorkflowInteractions.new, + workflow_rollback_batch_ids: {}, + workflow_registrations_by_batch_id: {}, + workflow_rollbacks_by_batch_id: {} + } + workflow_state.each { |name, value| instance_variable_set(:"@#{name}", value) } end def queue_job_ids_for(queue) @@ -519,6 +679,7 @@ def prune_terminal_batches(retention_limit, changed_job: nil) workflow_indexes: { workflow_dependency_job_ids_by_job_id:, workflow_children:, + workflow_interactions:, workflow_rollback_batch_ids:, workflow_registrations_by_batch_id:, workflow_rollbacks_by_batch_id: @@ -606,6 +767,7 @@ def cleanup_stale_batch_membership def cleanup_workflow_registration registration = workflow_registrations_by_batch_id.delete(batch_id) rollback = workflow_rollbacks_by_batch_id.delete(batch_id) + workflow_interactions.delete_by_batch(batch_id) cleanup_child_workflows(registration) workflow_rollback_batch_ids.delete(rollback.rollback_batch_id) if rollback registration @@ -656,6 +818,10 @@ def workflow_rollback_batch_ids workflow_indexes.fetch(:workflow_rollback_batch_ids) end + def workflow_interactions + workflow_indexes.fetch(:workflow_interactions) + end + def workflow_children workflow_indexes.fetch(:workflow_children) end diff --git a/core/karya/lib/karya/queue_store/in_memory/internal/workflow_interaction_requirements.rb b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_interaction_requirements.rb new file mode 100644 index 0000000..000aace --- /dev/null +++ b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_interaction_requirements.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +module Karya + module QueueStore + class InMemory + module Internal + module WorkflowSupport + # Maps workflow signal/event gates by concrete workflow job id. + class InteractionRequirements + def initialize(definition:, step_job_ids:) + @definition = definition + @step_job_ids = step_job_ids + end + + def to_h + definition.steps.each_with_object({}) do |workflow_step, requirements| + requirement = StepRequirement.new(workflow_step).to_h + next unless requirement + + requirements[step_job_ids.fetch(workflow_step.id)] = requirement + end.freeze + end + + private + + attr_reader :definition, :step_job_ids + + # Resolves one workflow step's optional interaction gate. + class StepRequirement + def initialize(workflow_step) + @workflow_step = workflow_step + end + + def to_h + wait_for_signal = workflow_step.wait_for_signal + return { kind: :signal, name: wait_for_signal }.freeze if wait_for_signal + + wait_for_event = workflow_step.wait_for_event + return { kind: :event, name: wait_for_event }.freeze if wait_for_event + + nil + end + + private + + attr_reader :workflow_step + end + + private_constant :StepRequirement + end + end + end + end + end +end diff --git a/core/karya/lib/karya/queue_store/in_memory/internal/workflow_interactions_support.rb b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_interactions_support.rb new file mode 100644 index 0000000..fa7403f --- /dev/null +++ b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_interactions_support.rb @@ -0,0 +1,115 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +module Karya + module QueueStore + class InMemory + module Internal + # Owner-local workflow query and interaction delivery support. + module WorkflowSupport + def query_workflow(batch_id:, query:, now:) + normalized_now = normalize_time(:now, now, error_class: Workflow::InvalidExecutionError) + normalized_batch_id = Workflow.send(:normalize_batch_identifier, :batch_id, batch_id) + + @mutex.synchronize do + recover_in_flight_locked(normalized_now) + batch = fetch_batch(normalized_batch_id) + registration = fetch_workflow_registration(batch.id) + jobs = fetch_batch_jobs(batch) + snapshot = WorkflowSnapshotBuilder.new(batch:, registration:, jobs:, now: normalized_now, state:).to_snapshot + WorkflowQuery.new(snapshot:, query:, queried_at: normalized_now).to_result + end + end + + def deliver_workflow_signal(batch_id:, signal:, payload:, now:) + deliver_workflow_interaction( + action: :deliver_workflow_signal, + batch_id:, + name: signal, + payload:, + now:, + kind: :signal + ) + end + + def deliver_workflow_event(batch_id:, event:, payload:, now:) + deliver_workflow_interaction( + action: :deliver_workflow_event, + batch_id:, + name: event, + payload:, + now:, + kind: :event + ) + end + + private + + def deliver_workflow_interaction(action:, batch_id:, name:, payload:, now:, kind:) + normalized_now = normalize_time(:now, now, error_class: Workflow::InvalidExecutionError) + normalized_batch_id = Workflow.send(:normalize_batch_identifier, :batch_id, batch_id) + interaction = Workflow::InteractionSnapshot.new(kind:, name:, payload:, received_at: normalized_now) + + @mutex.synchronize do + recover_in_flight_locked(normalized_now) + batch = fetch_batch(normalized_batch_id) + workflow_batch_id = batch.id + registration = fetch_workflow_registration(workflow_batch_id) + jobs = fetch_batch_jobs(batch) + snapshot = WorkflowSnapshotBuilder.new(batch:, registration:, jobs:, now: normalized_now, state:).to_snapshot + validate_workflow_interaction_delivery(snapshot, workflow_batch_id) + validate_workflow_interaction_support(registration, interaction.kind, interaction.name, workflow_batch_id) + state.register_workflow_interaction(batch_id: workflow_batch_id, interaction:) + BulkMutationReport.new( + action:, + performed_at: normalized_now, + requested_job_ids: [], + changed_jobs: [], + skipped_jobs: [] + ) + end + end + + def validate_workflow_interaction_delivery(snapshot, batch_id) + return unless WORKFLOW_INTERACTION_TERMINAL_STATES.include?(snapshot.state) + + raise Workflow::InvalidExecutionError, "workflow batch #{batch_id.inspect} is terminal and cannot receive interactions" + end + + def validate_workflow_interaction_support(registration, interaction_kind, interaction_name, batch_id) + supported = SupportedInteraction.new( + registration:, + interaction_kind:, + interaction_name: + ).supported? + return if supported + + raise Workflow::InvalidExecutionError, + "workflow batch #{batch_id.inspect} does not support #{interaction_kind} #{interaction_name.inspect}" + end + + # Checks whether one delivered interaction is declared by the workflow. + class SupportedInteraction + def initialize(registration:, interaction_kind:, interaction_name:) + @registration = registration + @interaction_kind = interaction_kind + @interaction_name = interaction_name + end + + def supported? + registration.interaction_supported_keys.key?([interaction_kind, interaction_name]) + end + + private + + attr_reader :interaction_kind, :interaction_name, :registration + end + end + end + end + end +end diff --git a/core/karya/lib/karya/queue_store/in_memory/internal/workflow_query.rb b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_query.rb new file mode 100644 index 0000000..5f01181 --- /dev/null +++ b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_query.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +module Karya + module QueueStore + class InMemory + module Internal + module WorkflowSupport + # Resolves one explicit workflow query against a workflow snapshot. + class WorkflowQuery + def initialize(snapshot:, query:, queried_at:) + @snapshot = snapshot + @query = query + @queried_at = queried_at + end + + def to_result + Workflow::QueryResult.new(query:, value:, queried_at:) + end + + private + + attr_reader :queried_at, :query, :snapshot + + def value + case normalized_query + when 'state' + snapshot.state + when 'current-step' + current_step_ids.first + when 'current-steps' + current_step_ids + else + raise Workflow::InvalidExecutionError, "unsupported workflow query #{normalized_query.inspect}" + end + end + + def normalized_query + @normalized_query ||= Workflow.send(:normalize_execution_identifier, :query, query).then do |value| + raise Workflow::InvalidExecutionError, "unsupported workflow query #{value.inspect}" unless %w[state current-step current-steps].include?(value) + + value + end + end + + def current_step_ids + CurrentSteps.new(snapshot.steps).to_a + end + + # Resolves the step ids that best represent current workflow progress. + class CurrentSteps + def initialize(steps) + @steps = steps + end + + def to_a + active_step_ids = step_ids_for(&:active?) + return active_step_ids unless active_step_ids.empty? + + ready_step_ids = step_ids_for(&:ready?) + return ready_step_ids unless ready_step_ids.empty? + + steps.select do |step| + step.blocked? && step.prerequisite_states.values.all?(:succeeded) + end.map(&:step_id).freeze + end + + private + + attr_reader :steps + + def step_ids_for(&) + steps.select(&).map(&:step_id).freeze + end + end + + private_constant :CurrentSteps + end + end + end + end + end +end diff --git a/core/karya/lib/karya/queue_store/in_memory/internal/workflow_support.rb b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_support.rb index 08de5c9..6a6fa08 100644 --- a/core/karya/lib/karya/queue_store/in_memory/internal/workflow_support.rb +++ b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_support.rb @@ -11,6 +11,8 @@ class InMemory module Internal # Owner-local workflow enqueue and prerequisite readiness support. module WorkflowSupport + WORKFLOW_INTERACTION_TERMINAL_STATES = %i[succeeded failed cancelled].freeze + def enqueue_workflow(definition:, jobs_by_step_id:, batch_id:, now:, compensation_jobs_by_step_id: {}) normalized_now = normalize_time(:now, now, error_class: Workflow::InvalidExecutionError) @@ -29,13 +31,15 @@ def enqueue_workflow(definition:, jobs_by_step_id:, batch_id:, now:, compensatio expire_reservations_locked(normalized_now) queued_jobs = jobs.map { |job| enqueue_validated_job(job, normalized_now) } dependency_job_ids_by_job_id = binding.dependency_job_ids_by_job_id + step_job_ids = StepJobIds.new(definition:, jobs:).to_h store_batch(batch) state.register_workflow_dependencies(dependency_job_ids_by_job_id) state.register_workflow( batch_id: workflow_batch_id, workflow_id: definition.id, - step_job_ids: StepJobIds.new(definition:, jobs:).to_h, + step_job_ids:, dependency_job_ids_by_job_id:, + interaction_requirements_by_job_id: InteractionRequirements.new(definition:, step_job_ids:).to_h, compensation_jobs_by_step_id: binding.compensation_jobs_by_step_id, child_workflow_ids_by_step_id: WorkflowChildIds.new(definition).to_h ) @@ -425,6 +429,9 @@ def to_snapshot jobs:, child_workflow_ids_by_step_id: registration.child_workflow_ids_by_step_id, child_workflows: child_workflow_snapshots, + interaction_requirements_by_job_id: registration.interaction_requirements_by_job_id, + interaction_received_at_by_job_id: interaction_received_at_by_job_id, + interactions: interaction_snapshots, parent: parent_snapshot, rollback: rollback_snapshot ) @@ -454,6 +461,21 @@ def parent_snapshot ChildWorkflowSnapshotBuilder.new(relationship:, resolver: child_state_resolver).to_snapshot end + def interaction_snapshots + state.workflow_interactions_for(batch.id) + end + + def interaction_received_at_by_job_id + registration.interaction_requirements_by_job_id.each_with_object({}) do |(job_id, requirement), received_at_by_job_id| + received_at = state.workflow_interaction_received_at( + batch_id: batch.id, + kind: requirement.fetch(:kind), + name: requirement.fetch(:name) + ) + received_at_by_job_id[job_id] = received_at if received_at + end.freeze + end + def child_state_resolver @child_state_resolver ||= WorkflowChildState.new(state:, now:) end @@ -563,11 +585,15 @@ def validation_error_message "workflow batch #{batch_id} has active jobs and cannot be rolled back" end end - private_constant :ChildWorkflowSnapshotBuilder, :RollbackSnapshotAttributes, :RollbackState, :WorkflowSnapshotBuilder + private_constant :ChildWorkflowSnapshotBuilder, + :RollbackSnapshotAttributes, + :RollbackState, + :WorkflowSnapshotBuilder def workflow_dependencies_satisfied?(job, now:) prerequisite_job_ids = state.workflow_dependency_job_ids_for(job.id) return false unless workflow_child_satisfied?(job, now:) + return false unless workflow_interaction_satisfied?(job) return true unless prerequisite_job_ids prerequisite_job_ids.all? do |prerequisite_job_id| @@ -587,6 +613,24 @@ def workflow_child_satisfied?(job, now:) child_workflow_state(relationship.child_batch_id, now:) == :succeeded end + + def workflow_interaction_satisfied?(job) + job_id = job.id + batch_id = state.batch_id_by_job_id[job_id] + return true unless batch_id + + registration = state.workflow_registrations_by_batch_id[batch_id] + return true unless registration + + requirement = registration.interaction_requirements_by_job_id[job_id] + return true unless requirement + + state.workflow_interaction_delivered?( + batch_id:, + kind: requirement.fetch(:kind), + name: requirement.fetch(:name) + ) + end end end end diff --git a/core/karya/lib/karya/workflow.rb b/core/karya/lib/karya/workflow.rb index 4b9acbe..0b8b474 100644 --- a/core/karya/lib/karya/workflow.rb +++ b/core/karya/lib/karya/workflow.rb @@ -13,6 +13,8 @@ require_relative 'workflow/dependency' require_relative 'workflow/definition' require_relative 'workflow/execution_binding' +require_relative 'workflow/interaction_snapshot' +require_relative 'workflow/query_result' require_relative 'workflow/rollback_snapshot' require_relative 'workflow/step' require_relative 'workflow/step_snapshot' @@ -76,8 +78,28 @@ def initialize(id) @steps = [] end - def step(id, handler:, arguments: {}, depends_on: nil, compensate_with: nil, compensation_arguments: {}, child_workflow: nil) - steps << Step.new(id:, handler:, arguments:, depends_on:, compensate_with:, compensation_arguments:, child_workflow:) + def step( + id, + handler:, + arguments: {}, + depends_on: nil, + compensate_with: nil, + compensation_arguments: {}, + child_workflow: nil, + wait_for_signal: nil, + wait_for_event: nil + ) + steps << Step.new( + id:, + handler:, + arguments:, + depends_on:, + compensate_with:, + compensation_arguments:, + child_workflow:, + wait_for_signal:, + wait_for_event: + ) nil end diff --git a/core/karya/lib/karya/workflow/interaction_snapshot.rb b/core/karya/lib/karya/workflow/interaction_snapshot.rb new file mode 100644 index 0000000..67ab053 --- /dev/null +++ b/core/karya/lib/karya/workflow/interaction_snapshot.rb @@ -0,0 +1,160 @@ +# frozen_string_literal: true + +require 'json' + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +module Karya + module Workflow + # Immutable inspection view of one workflow interaction delivery. + class InteractionSnapshot + MAX_PAYLOAD_BYTES = 16 * 1024 + KINDS = %i[signal event].freeze + + attr_reader :kind, :name, :payload, :received_at + + def initialize(**attributes) + attributes = Attributes.new(attributes) + @kind = attributes.kind + @name = attributes.name + @payload = attributes.payload + @received_at = attributes.received_at + freeze + end + + # Validates and exposes interaction snapshot attributes. + class Attributes + REQUIRED_ATTRIBUTES = %i[kind name payload received_at].freeze + + def initialize(attributes) + @attributes = attributes + validate_keys + end + + def kind + normalized_kind = Kind.new(fetch(:kind)).to_sym + return normalized_kind if KINDS.include?(normalized_kind) + + raise InvalidExecutionError, 'kind must be :signal or :event' + end + + def name + Workflow.send(:normalize_execution_identifier, :name, fetch(:name)) + end + + def payload + Payload.new(fetch(:payload)).to_h + end + + def received_at + Timestamp.new(:received_at, fetch(:received_at)).to_time + end + + private + + attr_reader :attributes + + def fetch(name) + attributes.fetch(name) { raise ArgumentError, "missing keyword: :#{name}" } + end + + def validate_keys + unknown_keys = attributes.keys - REQUIRED_ATTRIBUTES + return if unknown_keys.empty? + + raise ArgumentError, "unknown keyword: :#{unknown_keys.first}" + end + end + + # Normalizes one interaction kind. + class Kind + def initialize(value) + @value = value + end + + def to_sym + raise InvalidExecutionError, 'kind must be :signal or :event' unless value.is_a?(String) || value.is_a?(Symbol) + + value.to_sym + end + + private + + attr_reader :value + end + + # Normalizes and deep-freezes a JSON-compatible interaction payload. + class Payload + def initialize(payload) + @payload = payload + end + + def to_h + raise InvalidExecutionError, 'payload must be a Hash' unless payload.is_a?(Hash) + + normalized = normalize_hash(payload) + validate_size(normalized) + end + + private + + attr_reader :payload + + def validate_size(normalized) + payload_bytesize = JSON.generate(normalized).bytesize + return normalized if payload_bytesize <= MAX_PAYLOAD_BYTES + + raise InvalidExecutionError, "payload exceeds #{MAX_PAYLOAD_BYTES} bytes" + rescue JSON::GeneratorError => e + raise InvalidExecutionError, 'payload must be JSON-encodable', cause: e + end + + def normalize_hash(hash) + hash.each_with_object({}) do |(key, value), normalized| + raise InvalidExecutionError, 'payload keys must be Strings' unless key.is_a?(String) + + normalized[key.dup.freeze] = normalize_value(value) + end.freeze + end + + def normalize_value(value) + case value + when NilClass, TrueClass, FalseClass, Numeric + value + when String + value.dup.freeze + when Array + value.map { |entry| normalize_value(entry) }.freeze + when Hash + normalize_hash(value) + else + raise InvalidExecutionError, 'payload values must be JSON-compatible' + end + end + end + + # Normalizes timestamps into immutable values. + class Timestamp + def initialize(name, value) + @name = name + @value = value + end + + def to_time + return value.dup.freeze if value.is_a?(Time) + + raise InvalidExecutionError, "#{name} must be a Time" + end + + private + + attr_reader :name, :value + end + + private_constant :Attributes, :Kind, :KINDS, :MAX_PAYLOAD_BYTES, :Payload, :Timestamp + end + end +end diff --git a/core/karya/lib/karya/workflow/query_result.rb b/core/karya/lib/karya/workflow/query_result.rb new file mode 100644 index 0000000..ab9637b --- /dev/null +++ b/core/karya/lib/karya/workflow/query_result.rb @@ -0,0 +1,112 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +module Karya + module Workflow + # Immutable result for one explicit workflow query. + class QueryResult + SUPPORTED_QUERIES = %w[state current-step current-steps].freeze + WORKFLOW_STATES = %i[pending running blocked succeeded failed cancelled].freeze + + attr_reader :query, :queried_at, :value + + def initialize(query:, value:, queried_at:) + @query = Query.new(query).to_s + @value = Value.new(@query, value).normalize + @queried_at = Timestamp.new(:queried_at, queried_at).to_time + freeze + end + + # Normalizes the explicit query name. + class Query + def initialize(value) + @value = value + end + + def to_s + normalized_query = Workflow.send(:normalize_execution_identifier, :query, value) + return normalized_query if SUPPORTED_QUERIES.include?(normalized_query) + + raise InvalidExecutionError, "unsupported workflow query #{normalized_query.inspect}" + end + + private + + attr_reader :value + end + + # Validates and freezes query results by query type. + class Value + def initialize(query, value) + @query = query + @value = value + end + + def normalize + case query + when 'state' + normalize_state + when 'current-step' + normalize_current_step + when 'current-steps' + normalize_current_steps + else + raise InvalidExecutionError, "unsupported workflow query #{query.inspect}" + end + end + + private + + attr_reader :query, :value + + def normalize_state + raise InvalidExecutionError, 'workflow query "state" must return a Symbol' unless value.is_a?(Symbol) + raise InvalidExecutionError, 'workflow query "state" must return a workflow state' unless WORKFLOW_STATES.include?(value) + + value + end + + def normalize_current_step + case value + when NilClass + nil + else + Workflow.send(:normalize_execution_identifier, :step_id, value) + end + end + + def normalize_current_steps + raise InvalidExecutionError, 'workflow query "current-steps" must return an Array' unless value.is_a?(Array) + + value.map do |step_id| + Workflow.send(:normalize_execution_identifier, :step_id, step_id) + end.freeze + end + end + + # Normalizes timestamps into immutable values. + class Timestamp + def initialize(name, value) + @name = name + @value = value + end + + def to_time + return value.dup.freeze if value.is_a?(Time) + + raise InvalidExecutionError, "#{name} must be a Time" + end + + private + + attr_reader :name, :value + end + + private_constant :Query, :SUPPORTED_QUERIES, :Timestamp, :Value, :WORKFLOW_STATES + end + end +end diff --git a/core/karya/lib/karya/workflow/snapshot.rb b/core/karya/lib/karya/workflow/snapshot.rb index 0eafe4d..5c6a1d2 100644 --- a/core/karya/lib/karya/workflow/snapshot.rb +++ b/core/karya/lib/karya/workflow/snapshot.rb @@ -20,7 +20,15 @@ class Snapshot dependency_job_ids_by_job_id jobs ].freeze - OPTIONAL_ATTRIBUTES = %i[child_workflow_ids_by_step_id child_workflows parent rollback].freeze + OPTIONAL_ATTRIBUTES = %i[ + child_workflow_ids_by_step_id + child_workflows + interactions + interaction_requirements_by_job_id + interaction_received_at_by_job_id + parent + rollback + ].freeze SUPPORTED_ATTRIBUTES = (REQUIRED_ATTRIBUTES + OPTIONAL_ATTRIBUTES).freeze def initialize(**attributes) @@ -28,10 +36,24 @@ def initialize(**attributes) @identity = attributes.identity @membership = attributes.membership @child_relationships = attributes.child_relationships - @step_inspection = StepInspection.new(identity:, membership:, child_relationships:) + @interactions = attributes.interactions + grouped_interactions = interactions.group_by(&:kind) + @signals = grouped_interactions.fetch(:signal, []).freeze + @events = grouped_interactions.fetch(:event, []).freeze + interaction_state = InteractionState.new( + interaction_requirements_by_job_id: attributes.interaction_requirements_by_job_id, + interaction_received_at_by_job_id: attributes.interaction_received_at_by_job_id, + interactions: + ) + @step_inspection = StepInspection.new( + identity:, + membership:, + child_relationships:, + interaction_state: + ) @parent = attributes.parent @rollback = attributes.rollback - @summary_data = SummaryData.new(membership, child_relationships) + @summary_data = SummaryData.new(membership, step_inspection) freeze end @@ -99,6 +121,8 @@ def fetch_child_workflow(step_id) child_relationships.fetch_child_workflow(step_id) end + attr_reader :events, :interactions, :parent, :rollback, :signals + def state_counts summary_data.state_counts end @@ -119,8 +143,6 @@ def state summary_data.state end - attr_reader :parent, :rollback - # Validates and exposes snapshot construction attributes. class Attributes def initialize(attributes) @@ -162,6 +184,18 @@ def child_relationships ) end + def interactions + InteractionList.new(attributes.fetch(:interactions, [])).to_a + end + + def interaction_requirements_by_job_id + InteractionRequirements.new(attributes.fetch(:interaction_requirements_by_job_id, {})).to_h + end + + def interaction_received_at_by_job_id + InteractionReceivedAtByJobId.new(attributes.fetch(:interaction_received_at_by_job_id, {})).to_h + end + def parent value = attributes.fetch(:parent, nil) raise InvalidExecutionError, 'parent must be Karya::Workflow::ChildWorkflowSnapshot' if value && !value.is_a?(ChildWorkflowSnapshot) @@ -276,10 +310,12 @@ def build_step_states # Builds ordered per-step runtime inspection values. class StepInspection - def initialize(identity:, membership:, child_relationships:) + def initialize(identity:, membership:, child_relationships:, interaction_state:) @identity = identity @membership = membership @child_relationships = child_relationships + @interaction_requirements_by_job_id = interaction_state.interaction_requirements_by_job_id + @interaction_received_at_by_job_id = interaction_state.received_at_by_job_id @steps = build_steps @steps_by_id = @steps.to_h { |step_snapshot| [step_snapshot.step_id, step_snapshot] }.freeze freeze @@ -301,11 +337,13 @@ def fetch_step(step_id) private - attr_reader :child_relationships, :identity, :membership, :steps_by_id + attr_reader :child_relationships, :identity, :interaction_received_at_by_job_id, + :interaction_requirements_by_job_id, :membership, :steps_by_id def build_steps membership.step_job_ids.map do |step_id, job_id| prerequisite_job_ids = membership.dependency_job_ids_by_job_id.fetch(job_id, []) + interaction_requirement = interaction_requirements_by_job_id[job_id] StepSnapshot.new( workflow_id: identity.workflow_id, batch_id: identity.batch_id, @@ -315,7 +353,10 @@ def build_steps prerequisite_job_ids:, prerequisite_states: prerequisite_states_for(prerequisite_job_ids), child_workflow_id: child_relationships.child_workflow_id(step_id), - child_workflow: child_relationships.child_workflow(step_id) + child_workflow: child_relationships.child_workflow(step_id), + interaction_kind: interaction_requirement&.fetch(:kind, nil), + interaction_name: interaction_requirement&.fetch(:name, nil), + interaction_received_at: interaction_received_at_by_job_id[job_id] ) end.freeze end @@ -328,6 +369,31 @@ def prerequisite_states_for(prerequisite_job_ids) end end + # Groups interaction requirements, history, and readiness timestamps. + class InteractionState + attr_reader :interaction_requirements_by_job_id + + def initialize(interaction_requirements_by_job_id:, interaction_received_at_by_job_id:, interactions:) + @interaction_requirements_by_job_id = interaction_requirements_by_job_id + @interaction_received_at_by_job_id = interaction_received_at_by_job_id + @interactions = interactions + freeze + end + + def received_at_by_job_id + return interaction_received_at_by_job_id unless interaction_received_at_by_job_id.empty? + + InteractionDeliveries.new( + interaction_requirements_by_job_id:, + interactions: + ).to_h + end + + private + + attr_reader :interaction_received_at_by_job_id, :interactions + end + # Normalizes child workflow declarations by parent step id. class ChildWorkflowIds def initialize(child_workflow_ids_by_step_id) @@ -370,23 +436,232 @@ def to_a attr_reader :child_workflows end + # Normalizes workflow interaction snapshots. + class InteractionList + def initialize(interactions) + @interactions = interactions + end + + def to_a + raise InvalidExecutionError, 'interactions must be an Array' unless interactions.is_a?(Array) + + interactions.each do |interaction| + raise InvalidExecutionError, 'interactions entries must be Karya::Workflow::InteractionSnapshot' unless interaction.is_a?(InteractionSnapshot) + end + interactions.dup.freeze + end + + private + + attr_reader :interactions + end + + # Normalizes workflow interaction requirements keyed by concrete job id. + class InteractionRequirements + def initialize(interaction_requirements_by_job_id) + @interaction_requirements_by_job_id = interaction_requirements_by_job_id + end + + def to_h + raise InvalidExecutionError, 'interaction_requirements_by_job_id must be a Hash' unless interaction_requirements_by_job_id.is_a?(Hash) + + interaction_requirements_by_job_id.each_with_object({}) do |(job_id, requirement), normalized| + normalized_job_id = Workflow.send(:normalize_execution_identifier, :job_id, job_id) + raise InvalidExecutionError, "duplicate interaction requirement job #{normalized_job_id.inspect}" if normalized.key?(normalized_job_id) + + normalized[normalized_job_id] = Requirement.new(requirement).to_h + end.freeze + end + + private + + attr_reader :interaction_requirements_by_job_id + + # Normalizes one interaction requirement entry. + class Requirement + def initialize(requirement) + @requirement = requirement + end + + def to_h + raise InvalidExecutionError, 'interaction requirement must be a Hash' unless requirement.is_a?(Hash) + + kind = requirement.fetch(:kind) { raise InvalidExecutionError, 'interaction requirement must include :kind' } + name = requirement.fetch(:name) { raise InvalidExecutionError, 'interaction requirement must include :name' } + raise_invalid_kind unless kind.is_a?(String) || kind.is_a?(Symbol) + + kind = kind.to_sym + raise_invalid_kind unless %i[signal event].include?(kind) + + { + kind:, + name: Workflow.send(:normalize_execution_identifier, :interaction_name, name) + }.freeze + end + + private + + attr_reader :requirement + + def raise_invalid_kind + raise InvalidExecutionError, 'interaction requirement kind must be :signal or :event' + end + end + + private_constant :Requirement + end + + # Normalizes delivered interaction timestamps keyed by concrete job id. + class InteractionReceivedAtByJobId + def initialize(interaction_received_at_by_job_id) + @interaction_received_at_by_job_id = interaction_received_at_by_job_id + end + + def to_h + raise InvalidExecutionError, 'interaction_received_at_by_job_id must be a Hash' unless interaction_received_at_by_job_id.is_a?(Hash) + + interaction_received_at_by_job_id.each_with_object({}) do |(job_id, received_at), normalized| + normalized_job_id = Workflow.send(:normalize_execution_identifier, :job_id, job_id) + raise InvalidExecutionError, "duplicate interaction delivery job #{normalized_job_id.inspect}" if normalized.key?(normalized_job_id) + + normalized[normalized_job_id] = Timestamp.new(:interaction_received_at, received_at).to_time + end.freeze + end + + private + + attr_reader :interaction_received_at_by_job_id + end + + # Resolves interaction delivery timestamps for gated workflow jobs. + class InteractionDeliveries + def initialize(interaction_requirements_by_job_id:, interactions:) + @interaction_requirements_by_job_id = interaction_requirements_by_job_id + @interactions = interactions + end + + def to_h + delivery_index = DeliveryIndex.new(interactions).to_h + matching_job_index = MatchingJobIndex.new(interaction_requirements_by_job_id).to_h + ReceivedAtByJobId.new(delivery_index:, matching_job_index:).to_h + end + + private + + attr_reader :interaction_requirements_by_job_id, :interactions + + # Builds delivery timestamps keyed by interaction identity. + class DeliveryIndex + def initialize(interactions) + @interactions = interactions + end + + def to_h + interactions.to_h { |interaction| DeliveryEntry.new(interaction).to_pair } + end + + private + + attr_reader :interactions + + # Converts one interaction snapshot into its identity and timestamp. + class DeliveryEntry + def initialize(interaction) + @interaction = interaction + end + + def to_pair + [[interaction.kind, interaction.name], interaction.received_at] + end + + private + + attr_reader :interaction + end + end + + # Builds workflow job ids keyed by required interaction identity. + class MatchingJobIndex + def initialize(interaction_requirements_by_job_id) + @interaction_requirements_by_job_id = interaction_requirements_by_job_id + @index = {} + end + + def to_h + interaction_requirements_by_job_id.each do |job_id, requirement| + register(RequirementKey.new(requirement).to_a, job_id) + end + index.freeze + end + + private + + attr_reader :index, :interaction_requirements_by_job_id + + def register(interaction_key, job_id) + job_ids = index[interaction_key] + if job_ids + job_ids << job_id + else + index[interaction_key] = [job_id] + end + end + + # Converts one interaction requirement into its lookup key. + class RequirementKey + def initialize(requirement) + @requirement = requirement + end + + def to_a + [requirement.fetch(:kind), requirement.fetch(:name)] + end + + private + + attr_reader :requirement + end + end + + # Builds received-at timestamps keyed by gated workflow job id. + class ReceivedAtByJobId + def initialize(delivery_index:, matching_job_index:) + @delivery_index = delivery_index + @matching_job_index = matching_job_index + @received_at_by_job_id = {} + end + + def to_h + delivery_index.each do |interaction_key, received_at| + register(interaction_key, received_at) + end + received_at_by_job_id.freeze + end + + private + + attr_reader :delivery_index, :matching_job_index, :received_at_by_job_id + + def register(interaction_key, received_at) + matching_job_index.fetch(interaction_key, []).each do |job_id| + received_at_by_job_id[job_id] = received_at + end + end + end + end + # Groups snapshot state summary fields. class SummaryData attr_reader :completed_count, :failed_count, :state, :state_counts, :total_count - def initialize(membership, child_relationships) + def initialize(membership, step_inspection) jobs = membership.jobs summary = Summary.new(jobs) @state_counts = summary.state_counts @total_count = jobs.length @completed_count = summary.completed_count @failed_count = summary.failed_count - @state = State.new( - jobs:, - step_job_ids: membership.step_job_ids, - dependency_job_ids_by_job_id: membership.dependency_job_ids_by_job_id, - child_relationships: - ).to_sym + @state = State.new(jobs:, steps: step_inspection.steps).to_sym freeze end end @@ -535,13 +810,9 @@ def active? # Derives workflow state from current job states and prerequisites. class State - def initialize(jobs:, step_job_ids:, dependency_job_ids_by_job_id:, child_relationships:) + def initialize(jobs:, steps:) @jobs = jobs - @step_job_ids = step_job_ids - @dependency_job_ids_by_job_id = dependency_job_ids_by_job_id - @child_relationships = child_relationships - @jobs_by_id = jobs.to_h { |job| [job.id, job] } - @step_id_by_job_id = step_job_ids.to_h { |step_id, job_id| [job_id, step_id] } + @steps = steps end def to_sym @@ -558,7 +829,7 @@ def to_sym private - attr_reader :child_relationships, :dependency_job_ids_by_job_id, :jobs, :jobs_by_id, :step_id_by_job_id + attr_reader :jobs, :steps def failed? jobs.any? { |job| FAILED_STATES.include?(job.state) } @@ -581,27 +852,7 @@ def progressed? end def blocked? - jobs.any? do |job| - WAITING_STATES.include?(job.state) && (dependency_blocked?(job) || child_workflow_blocked?(job)) - end - end - - def dependency_blocked?(job) - dependency_job_ids_by_job_id.fetch(job.id, []).any? do |dependency_job_id| - dependency_job = jobs_by_id[dependency_job_id] - !dependency_job || dependency_job.state != :succeeded - end - end - - def child_workflow_blocked?(job) - step_id = step_id_by_job_id.fetch(job.id) - child_workflow_id = child_relationships.child_workflow_id(step_id) - return false unless child_workflow_id - - child_workflow = child_relationships.child_workflow(step_id) - return true unless child_workflow - - child_workflow.child_state != :succeeded + steps.any?(&:blocked?) end end @@ -614,6 +865,11 @@ def child_workflow_blocked?(job) :DependencyJobIds, :FAILED_STATES, :Identity, + :InteractionDeliveries, + :InteractionList, + :InteractionReceivedAtByJobId, + :InteractionRequirements, + :InteractionState, :JobState, :JobList, :Membership, diff --git a/core/karya/lib/karya/workflow/step.rb b/core/karya/lib/karya/workflow/step.rb index 66cb8b4..6eb2c00 100644 --- a/core/karya/lib/karya/workflow/step.rb +++ b/core/karya/lib/karya/workflow/step.rb @@ -17,7 +17,9 @@ class Step :compensation_arguments, :depends_on, :handler, - :id + :id, + :wait_for_event, + :wait_for_signal def initialize(id:, handler:, **options) @id = Workflow.send(:normalize_identifier, :step_id, id) @@ -32,7 +34,10 @@ def initialize(id:, handler:, **options) step_id: @id, handler: compensation_handler_label ).normalize + @wait_for_signal = InteractionName.new(:wait_for_signal, normalized_options.wait_for_signal).normalize + @wait_for_event = InteractionName.new(:wait_for_event, normalized_options.wait_for_event).normalize validate_compensation_configuration + validate_interaction_configuration freeze end @@ -46,7 +51,15 @@ def child_workflow? # Centralizes optional constructor field defaults and key validation. class Options - ALLOWED_KEYS = %i[arguments depends_on compensate_with compensation_arguments child_workflow].freeze + ALLOWED_KEYS = %i[ + arguments + depends_on + compensate_with + compensation_arguments + child_workflow + wait_for_signal + wait_for_event + ].freeze def initialize(options) @options = options @@ -73,6 +86,14 @@ def compensation_arguments options.fetch(:compensation_arguments, {}) end + def wait_for_signal + options.fetch(:wait_for_signal, nil) + end + + def wait_for_event + options.fetch(:wait_for_event, nil) + end + private attr_reader :options @@ -185,7 +206,28 @@ def normalize attr_reader :value end - private_constant :Arguments, :ChildWorkflow, :CompensationHandler, :Dependencies, :Options + # Normalizes one optional interaction gate name. + class InteractionName + def initialize(field_name, value) + @field_name = field_name + @value = value + end + + def normalize + case value + when NilClass + nil + else + Workflow.send(:normalize_identifier, field_name, value) + end + end + + private + + attr_reader :field_name, :value + end + + private_constant :Arguments, :ChildWorkflow, :CompensationHandler, :Dependencies, :InteractionName, :Options private @@ -195,6 +237,12 @@ def validate_compensation_configuration raise InvalidDefinitionError, "workflow step #{id.inspect} cannot define compensation_arguments without compensate_with" end + def validate_interaction_configuration + return unless wait_for_signal && wait_for_event + + raise InvalidDefinitionError, "workflow step #{id.inspect} cannot wait for both signal and event" + end + def compensation_handler_label compensate_with || 'compensation' end diff --git a/core/karya/lib/karya/workflow/step_snapshot.rb b/core/karya/lib/karya/workflow/step_snapshot.rb index f181611..b534e98 100644 --- a/core/karya/lib/karya/workflow/step_snapshot.rb +++ b/core/karya/lib/karya/workflow/step_snapshot.rb @@ -11,25 +11,21 @@ module Workflow class StepSnapshot WAITING_STATES = %i[queued submission].freeze - attr_reader :batch_id, - :child_workflow, + attr_reader :child_workflow, :child_workflow_id, :job, - :job_id, :prerequisite_job_ids, - :prerequisite_states, - :state, - :step_id, - :workflow_id + :prerequisite_states def initialize(**attributes) attributes = Attributes.new(attributes) - @workflow_id = attributes.workflow_id - @batch_id = attributes.batch_id - @step_id = attributes.step_id - @job_id = attributes.job_id + @identity = Identity.new( + workflow_id: attributes.workflow_id, + batch_id: attributes.batch_id, + step_id: attributes.step_id, + job_id: attributes.job_id + ) @job = attributes.job - @state = @job.state @prerequisite_job_ids = attributes.prerequisite_job_ids @prerequisite_states = PrerequisiteStates.new( prerequisite_job_ids: @prerequisite_job_ids, @@ -39,15 +35,36 @@ def initialize(**attributes) @child_workflow = ChildWorkflow.new( child_workflow: attributes.child_workflow, child_workflow_id: @child_workflow_id, - parent_batch_id: @batch_id, - parent_step_id: @step_id, - parent_job_id: @job_id + parent_batch_id: batch_id, + parent_step_id: step_id, + parent_job_id: job_id ).to_snapshot + @interaction = Interaction.new( + kind: attributes.interaction_kind, + name: attributes.interaction_name, + received_at: attributes.interaction_received_at + ) freeze end + def workflow_id = identity.workflow_id + + def batch_id = identity.batch_id + + def step_id = identity.step_id + + def job_id = identity.job_id + + def state = job.state + + def interaction_kind = interaction.kind + + def interaction_name = interaction.name + + def interaction_received_at = interaction.received_at + def ready? - waiting? && prerequisites_succeeded? && child_workflow_succeeded? + waiting? && prerequisites_succeeded? && child_workflow_succeeded? && interaction_satisfied? end def blocked? @@ -77,7 +94,13 @@ class Attributes prerequisite_job_ids prerequisite_states ].freeze - OPTIONAL_ATTRIBUTES = %i[child_workflow_id child_workflow].freeze + OPTIONAL_ATTRIBUTES = %i[ + child_workflow_id + child_workflow + interaction_kind + interaction_name + interaction_received_at + ].freeze SUPPORTED_ATTRIBUTES = (REQUIRED_ATTRIBUTES + OPTIONAL_ATTRIBUTES).freeze def initialize(attributes) @@ -124,6 +147,18 @@ def child_workflow attributes.fetch(:child_workflow, nil) end + def interaction_kind + InteractionKindValue.new(attributes.fetch(:interaction_kind, nil)).to_sym + end + + def interaction_name + OptionalIdentifier.new(:interaction_name, attributes.fetch(:interaction_name, nil)).to_s + end + + def interaction_received_at + OptionalTimestamp.new(:interaction_received_at, attributes.fetch(:interaction_received_at, nil)).to_time + end + private attr_reader :attributes @@ -172,6 +207,19 @@ def validate_identity end end + # Groups the normalized identity fields for one step snapshot. + class Identity + attr_reader :batch_id, :job_id, :step_id, :workflow_id + + def initialize(workflow_id:, batch_id:, step_id:, job_id:) + @workflow_id = workflow_id + @batch_id = batch_id + @step_id = step_id + @job_id = job_id + freeze + end + end + # Validates the concrete job backing a step snapshot. class JobEntry def initialize(job_id:, job:) @@ -258,10 +306,137 @@ def validate_membership(normalized_states) end end - private_constant :Attributes, :ChildWorkflow, :JobEntry, :JobIdList, :PrerequisiteStates, :WAITING_STATES + # Groups one optional workflow interaction gate and its delivery state. + class Interaction + attr_reader :kind, :name, :received_at + + def initialize(kind:, name:, received_at:) + @kind = kind + @name = name + @received_at = received_at + validate_presence + validate_timestamp_dependency + freeze + end + + private + + def validate_presence + return if [kind, name].all?(&:nil?) + return if [kind, name].none?(&:nil?) + + raise InvalidExecutionError, 'interaction_kind and interaction_name must both be present or both be nil' + end + + def validate_timestamp_dependency + return if [received_at].compact.empty? + return if name + + raise InvalidExecutionError, 'interaction_received_at requires interaction_kind and interaction_name' + end + end + + # Normalizes one optional interaction kind. + class InteractionKindValue + def initialize(value) + @value = value + end + + def to_sym + return nil unless value + + raise_invalid_kind unless value.is_a?(String) || value.is_a?(Symbol) + + kind = value.to_sym + return kind if %i[signal event].include?(kind) + + raise_invalid_kind + end + + private + + attr_reader :value + + def raise_invalid_kind + raise InvalidExecutionError, 'interaction_kind must be :signal or :event' + end + end + + # Normalizes one optional identifier field. + class OptionalIdentifier + def initialize(field_name, value) + @field_name = field_name + @value = value + end + + def to_s + return nil unless value + + normalize_execution_identifier + end + + private + + attr_reader :field_name, :value + + def normalize_execution_identifier + Workflow.send(:normalize_execution_identifier, field_name, value) + end + end + + # Normalizes one optional timestamp field. + class OptionalTimestamp + def initialize(field_name, value) + @field_name = field_name + @value = value + end + + def to_time + return nil unless value + + Timestamp.new(field_name, value).to_time + end + + private + + attr_reader :field_name, :value + end + + # Normalizes timestamps into immutable values. + class Timestamp + def initialize(name, value) + @name = name + @value = value + end + + def to_time + return value.dup.freeze if value.is_a?(Time) + + raise InvalidExecutionError, "#{name} must be a Time" + end + + private + + attr_reader :name, :value + end + + private_constant :Attributes, + :ChildWorkflow, + :Identity, + :Interaction, + :InteractionKindValue, + :JobEntry, + :JobIdList, + :OptionalIdentifier, + :OptionalTimestamp, + :PrerequisiteStates, + :Timestamp, + :WAITING_STATES private + attr_reader :identity, :interaction + def waiting? WAITING_STATES.include?(state) end @@ -276,6 +451,10 @@ def child_workflow_succeeded? child_workflow.child_state == :succeeded end + + def interaction_satisfied? + interaction_name ? !!interaction_received_at : true + end end end end diff --git a/core/karya/sig/karya.rbs b/core/karya/sig/karya.rbs index a018986..9675435 100644 --- a/core/karya/sig/karya.rbs +++ b/core/karya/sig/karya.rbs @@ -26,21 +26,43 @@ module Karya type normalized_state_name_array = Array[normalized_state_name] type normalized_transition_map = Hash[normalized_state_name, normalized_state_name_array] type child_workflow_snapshot_attribute_value = state_name | Karya::workflow_state + type interaction_kind = :signal | :event + type interaction_kind_input = state_name + type workflow_query = "state" | "current-step" | "current-steps" + type workflow_query_value = workflow_state | String? | Array[String] + type workflow_query_input_value = workflow_state | state_name? | Array[state_name] + type interaction_requirement = { kind: interaction_kind, name: String } + type interaction_requirement_input = { kind: interaction_kind_input, name: state_name } + type interaction_payload_value = + nil | + bool | + Numeric | + String | + Array[interaction_payload_value] | + Hash[String, interaction_payload_value] + type interaction_payload = Hash[String, interaction_payload_value] + type interaction_snapshot_attribute_value = interaction_kind_input | state_name | interaction_payload | Time type step_snapshot_attribute_value = state_name | Job | Array[state_name] | Hash[state_name, state_name?] | + interaction_kind_input | Workflow::ChildWorkflowSnapshot | + Time | nil type workflow_snapshot_attribute_value = state_name | Time | Hash[state_name, state_name] | Hash[state_name, state_name_array] | + Hash[state_name, interaction_requirement_input] | + Hash[state_name, Time] | Array[Job] | Array[Workflow::ChildWorkflowSnapshot] | + Array[Workflow::InteractionSnapshot] | Workflow::ChildWorkflowSnapshot | + Workflow::InteractionSnapshot | Workflow::RollbackSnapshot | nil type handler_parameter = [Symbol, Symbol?] @@ -202,6 +224,8 @@ module Karya :replay_dead_letter_jobs | :retry_dead_letter_jobs | :discard_dead_letter_jobs | + :deliver_workflow_signal | + :deliver_workflow_event | :enqueue_child_workflow | :rollback_workflow | :sync_child_workflows | diff --git a/core/karya/sig/karya/queue_store/base.rbs b/core/karya/sig/karya/queue_store/base.rbs index aa98ca0..947cc8d 100644 --- a/core/karya/sig/karya/queue_store/base.rbs +++ b/core/karya/sig/karya/queue_store/base.rbs @@ -22,6 +22,25 @@ module Karya ) -> BulkMutationReport def workflow_snapshot: (batch_id: Karya::state_name, now: Time) -> Karya::Workflow::Snapshot + def query_workflow: ( + batch_id: Karya::state_name, + query: Karya::state_name, + now: Time + ) -> Karya::Workflow::QueryResult + + def deliver_workflow_signal: ( + batch_id: Karya::state_name, + signal: Karya::state_name, + payload: Karya::interaction_payload, + now: Time + ) -> BulkMutationReport + + def deliver_workflow_event: ( + batch_id: Karya::state_name, + event: Karya::state_name, + payload: Karya::interaction_payload, + now: Time + ) -> BulkMutationReport def enqueue_child_workflow: ( parent_batch_id: Karya::state_name, diff --git a/core/karya/sig/karya/queue_store/in_memory.rbs b/core/karya/sig/karya/queue_store/in_memory.rbs index 4a4a73e..d684dac 100644 --- a/core/karya/sig/karya/queue_store/in_memory.rbs +++ b/core/karya/sig/karya/queue_store/in_memory.rbs @@ -45,6 +45,23 @@ module Karya ?compensation_jobs_by_step_id: Hash[Karya::state_name, Job] ) -> BulkMutationReport def workflow_snapshot: (batch_id: Karya::state_name, now: Time) -> Karya::Workflow::Snapshot + def query_workflow: ( + batch_id: Karya::state_name, + query: Karya::state_name, + now: Time + ) -> Karya::Workflow::QueryResult + def deliver_workflow_signal: ( + batch_id: Karya::state_name, + signal: Karya::state_name, + payload: Karya::interaction_payload, + now: Time + ) -> BulkMutationReport + def deliver_workflow_event: ( + batch_id: Karya::state_name, + event: Karya::state_name, + payload: Karya::interaction_payload, + now: Time + ) -> BulkMutationReport def enqueue_child_workflow: ( parent_batch_id: Karya::state_name, parent_step_id: Karya::state_name, diff --git a/core/karya/sig/karya/queue_store/in_memory/internal/store_state.rbs b/core/karya/sig/karya/queue_store/in_memory/internal/store_state.rbs index bb297e1..17d85bb 100644 --- a/core/karya/sig/karya/queue_store/in_memory/internal/store_state.rbs +++ b/core/karya/sig/karya/queue_store/in_memory/internal/store_state.rbs @@ -31,12 +31,14 @@ module Karya @terminal_batch_ids_in_order: Array[String] @workflow_children: WorkflowChildren @workflow_dependency_job_ids_by_job_id: Hash[String, Array[String]] + @workflow_interactions: WorkflowInteractions @workflow_rollback_batch_ids: Hash[String, bool] @workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration] @workflow_rollbacks_by_batch_id: Hash[String, WorkflowRollback] attr_reader breaker_failures_by_scope: Hash[String, Array[Time]] attr_reader batches_by_id: Hash[String, Karya::Workflow::Batch] + attr_reader batch_id_by_job_id: Hash[String, String] attr_reader breaker_states_by_scope: Hash[String, Karya::circuit_breaker_runtime_state] attr_reader executions_by_token: Hash[String, Reservation] attr_reader execution_tokens_in_order: Array[String] @@ -56,6 +58,7 @@ module Karya attr_reader stuck_job_recoveries_by_id: Hash[String, Karya::stuck_job_recovery] attr_reader workflow_children: WorkflowChildren attr_reader workflow_dependency_job_ids_by_job_id: Hash[String, Array[String]] + attr_reader workflow_interactions: WorkflowInteractions attr_reader workflow_rollback_batch_ids: Hash[String, bool] attr_reader workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration] attr_reader workflow_rollbacks_by_batch_id: Hash[String, WorkflowRollback] @@ -87,13 +90,18 @@ module Karya def register_batch: (Karya::Workflow::Batch batch) -> Karya::Workflow::Batch def prune_terminal_batches: (Integer retention_limit, ?changed_job: Job?) -> Array[String] def register_workflow_dependencies: (Hash[String, Array[String]] dependency_job_ids_by_job_id) -> Hash[String, Array[String]] + def register_workflow_interaction: (batch_id: String, interaction: Karya::Workflow::InteractionSnapshot) -> Array[Karya::Workflow::InteractionSnapshot] def workflow_dependency_job_ids_for: (String job_id) -> Array[String]? + def workflow_interactions_for: (String batch_id) -> Array[Karya::Workflow::InteractionSnapshot] + def workflow_interaction_delivered?: (batch_id: String, kind: Karya::interaction_kind, name: String) -> bool + def workflow_interaction_received_at: (batch_id: String, kind: Karya::interaction_kind, name: String) -> Time? def register_workflow: ( batch_id: String, workflow_id: String, step_job_ids: Hash[String, String], dependency_job_ids_by_job_id: Hash[String, Array[String]], compensation_jobs_by_step_id: Hash[String, Job], + ?interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement], ?child_workflow_ids_by_step_id: Hash[String, String] ) -> WorkflowRegistration def register_workflow_rollback: ( @@ -112,13 +120,26 @@ module Karya @workflow_id: String @step_job_ids: Hash[String, String] @dependency_job_ids_by_job_id: Hash[String, Array[String]] + @interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement] + @interaction_supported_keys: Hash[[Karya::interaction_kind, String], bool] @compensation_jobs_by_step_id: Hash[String, Job] @child_workflow_ids_by_step_id: Hash[String, String] + def self.build: ( + workflow_id: String, + step_job_ids: Hash[String, String], + dependency_job_ids_by_job_id: Hash[String, Array[String]], + interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement], + compensation_jobs_by_step_id: Hash[String, Job], + child_workflow_ids_by_step_id: Hash[String, String] + ) -> WorkflowRegistration + def initialize: ( String workflow_id, Hash[String, String] step_job_ids, Hash[String, Array[String]] dependency_job_ids_by_job_id, + Hash[String, Karya::interaction_requirement] interaction_requirements_by_job_id, + Hash[[Karya::interaction_kind, String], bool] interaction_supported_keys, Hash[String, Job] compensation_jobs_by_step_id, Hash[String, String] child_workflow_ids_by_step_id ) -> void @@ -126,6 +147,8 @@ module Karya attr_reader workflow_id: String attr_reader step_job_ids: Hash[String, String] attr_reader dependency_job_ids_by_job_id: Hash[String, Array[String]] + attr_reader interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement] + attr_reader interaction_supported_keys: Hash[[Karya::interaction_kind, String], bool] attr_reader compensation_jobs_by_step_id: Hash[String, Job] attr_reader child_workflow_ids_by_step_id: Hash[String, String] end @@ -139,6 +162,7 @@ module Karya } @workflow_indexes: { workflow_children: WorkflowChildren, + workflow_interactions: WorkflowInteractions, workflow_rollback_batch_ids: Hash[String, bool], workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration], workflow_rollbacks_by_batch_id: Hash[String, WorkflowRollback] @@ -153,6 +177,7 @@ module Karya }, workflow_indexes: { workflow_children: WorkflowChildren, + workflow_interactions: WorkflowInteractions, workflow_rollback_batch_ids: Hash[String, bool], workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration], workflow_rollbacks_by_batch_id: Hash[String, WorkflowRollback] @@ -167,6 +192,7 @@ module Karya }, workflow_indexes: { workflow_children: WorkflowChildren, + workflow_interactions: WorkflowInteractions, workflow_rollback_batch_ids: Hash[String, bool], workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration], workflow_rollbacks_by_batch_id: Hash[String, WorkflowRollback] @@ -184,6 +210,7 @@ module Karya } attr_reader workflow_indexes: { workflow_children: WorkflowChildren, + workflow_interactions: WorkflowInteractions, workflow_rollback_batch_ids: Hash[String, bool], workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration], workflow_rollbacks_by_batch_id: Hash[String, WorkflowRollback] @@ -196,6 +223,7 @@ module Karya def workflow_dependency_job_ids_by_job_id: () -> Hash[String, Array[String]] def workflow_children: () -> WorkflowChildren def workflow_rollback_batch_ids: () -> Hash[String, bool] + def workflow_interactions: () -> WorkflowInteractions def workflow_registrations_by_batch_id: () -> Hash[String, WorkflowRegistration] def workflow_rollbacks_by_batch_id: () -> Hash[String, WorkflowRollback] end @@ -274,6 +302,50 @@ module Karya end end + class WorkflowInteractions + @by_batch_id: Hash[String, WorkflowInteractions::Inbox] + EMPTY: Array[Karya::Workflow::InteractionSnapshot] + + def initialize: () -> void + def for_batch: (String batch_id) -> Array[Karya::Workflow::InteractionSnapshot] + def includes?: (batch_id: String, kind: Karya::interaction_kind, name: String) -> bool + def register: ( + batch_id: String, + interaction: Karya::Workflow::InteractionSnapshot + ) -> Array[Karya::Workflow::InteractionSnapshot] + def configure: ( + batch_id: String, + supported_keys: Hash[[Karya::interaction_kind, String], bool] | Array[[Karya::interaction_kind, String]] + ) -> WorkflowInteractions::Inbox + def delete_by_batch: (String batch_id) -> Array[Karya::Workflow::InteractionSnapshot] + + private + + def current_inbox: (String batch_id) -> WorkflowInteractions::Inbox + + class Inbox + @interactions: Array[Karya::Workflow::InteractionSnapshot] + @received_at_by_key: Hash[[Karya::interaction_kind, String], Time] + @max_size: Integer + @supported_keys: Hash[[Karya::interaction_kind, String], bool] + @to_a: Array[Karya::Workflow::InteractionSnapshot]? + + def initialize: (max_size: Integer) -> void + def append: (Karya::Workflow::InteractionSnapshot interaction) -> Inbox + def configure: (supported_keys: Hash[[Karya::interaction_kind, String], bool] | Array[[Karya::interaction_kind, String]]) -> Inbox + def to_a: () -> Array[Karya::Workflow::InteractionSnapshot] + def includes?: (kind: Karya::interaction_kind, name: String) -> bool + def received_at_for: (kind: Karya::interaction_kind, name: String) -> Time? + + private + + attr_reader interactions: Array[Karya::Workflow::InteractionSnapshot] + attr_reader max_size: Integer + attr_reader received_at_by_key: Hash[[Karya::interaction_kind, String], Time] + attr_reader supported_keys: Hash[[Karya::interaction_kind, String], bool] + end + end + def trim_fair_queue_history: () -> void def prune_expired_reservation_tokens: () -> bool? end diff --git a/core/karya/sig/karya/queue_store/in_memory/internal/workflow_support.rbs b/core/karya/sig/karya/queue_store/in_memory/internal/workflow_support.rbs index 3f14cfa..51bc12a 100644 --- a/core/karya/sig/karya/queue_store/in_memory/internal/workflow_support.rbs +++ b/core/karya/sig/karya/queue_store/in_memory/internal/workflow_support.rbs @@ -11,6 +11,23 @@ module Karya ?compensation_jobs_by_step_id: Hash[Karya::state_name, Job] ) -> BulkMutationReport def workflow_snapshot: (batch_id: Karya::state_name, now: Time) -> Karya::Workflow::Snapshot + def query_workflow: ( + batch_id: Karya::state_name, + query: Karya::state_name, + now: Time + ) -> Karya::Workflow::QueryResult + def deliver_workflow_signal: ( + batch_id: Karya::state_name, + signal: Karya::state_name, + payload: Karya::interaction_payload, + now: Time + ) -> BulkMutationReport + def deliver_workflow_event: ( + batch_id: Karya::state_name, + event: Karya::state_name, + payload: Karya::interaction_payload, + now: Time + ) -> BulkMutationReport def rollback_workflow: (batch_id: Karya::state_name, now: Time, reason: String) -> BulkMutationReport def retry_workflow_steps: (batch_id: Karya::state_name, step_ids: Array[Karya::state_name], now: Time) -> BulkMutationReport def dead_letter_workflow_steps: ( @@ -127,8 +144,25 @@ module Karya def step_job_ids: () -> Hash[String, String] end + private + def fetch_workflow_registration: (String batch_id) -> InMemory::Internal::StoreState::WorkflowRegistration def normalize_rollback_reason: (String reason) -> String + def deliver_workflow_interaction: ( + action: Karya::bulk_mutation_action, + batch_id: Karya::state_name, + name: Karya::state_name, + payload: Karya::interaction_payload, + now: Time, + kind: Karya::interaction_kind + ) -> BulkMutationReport + def validate_workflow_interaction_delivery: (Karya::Workflow::Snapshot snapshot, String batch_id) -> nil + def validate_workflow_interaction_support: ( + InMemory::Internal::StoreState::WorkflowRegistration registration, + Karya::interaction_kind interaction_kind, + String interaction_name, + String batch_id + ) -> nil def prepare_rollback: (String batch_id, Time now) -> Rollback def build_rollback_batch: (batch_id: String, jobs: Array[Job], now: Time) -> Karya::Workflow::Batch? def raise_duplicate_batch: (String batch_id) -> nil @@ -140,8 +174,7 @@ module Karya now: Time ) { (String, Array[Job], Array[Karya::bulk_skipped_job]) -> void } -> BulkMutationReport def workflow_control_job_ids: (Karya::state_name batch_id, Array[Karya::state_name] step_ids) -> Array[String] - - private + def workflow_interaction_satisfied?: (Job job) -> bool class WorkflowSnapshotBuilder @batch: Karya::Workflow::Batch @@ -168,6 +201,8 @@ module Karya attr_reader state: InMemory::Internal::StoreState def rollback_snapshot: () -> Karya::Workflow::RollbackSnapshot? def child_workflow_snapshots: () -> Array[Karya::Workflow::ChildWorkflowSnapshot] + def interaction_snapshots: () -> Array[Karya::Workflow::InteractionSnapshot] + def interaction_received_at_by_job_id: () -> Hash[String, Time] def parent_snapshot: () -> Karya::Workflow::ChildWorkflowSnapshot? def child_state_resolver: () -> InMemory::Internal::WorkflowChildState end @@ -224,6 +259,41 @@ module Karya def validation_error_message: () -> String end + class InteractionRequirements + @definition: Karya::Workflow::Definition + @step_job_ids: Hash[String, String] + + def initialize: (definition: Karya::Workflow::Definition, step_job_ids: Hash[String, String]) -> void + def to_h: () -> Hash[String, Karya::interaction_requirement] + + private + + attr_reader definition: Karya::Workflow::Definition + attr_reader step_job_ids: Hash[String, String] + end + + class WorkflowQuery + @snapshot: Karya::Workflow::Snapshot + @query: Karya::state_name + @queried_at: Time + + def initialize: ( + snapshot: Karya::Workflow::Snapshot, + query: Karya::state_name, + queried_at: Time + ) -> void + def to_result: () -> Karya::Workflow::QueryResult + + private + + attr_reader snapshot: Karya::Workflow::Snapshot + attr_reader query: Karya::state_name + attr_reader queried_at: Time + def value: () -> Karya::workflow_query_value + def normalized_query: () -> Karya::workflow_query + def current_step_ids: () -> Array[String] + end + def workflow_dependencies_satisfied?: (Job job, now: Time) -> bool def workflow_child_satisfied?: (Job job, now: Time) -> bool end diff --git a/core/karya/sig/karya/workflow.rbs b/core/karya/sig/karya/workflow.rbs index ecd8587..dba3b78 100644 --- a/core/karya/sig/karya/workflow.rbs +++ b/core/karya/sig/karya/workflow.rbs @@ -287,28 +287,138 @@ module Karya end end + class InteractionSnapshot + @kind: Karya::interaction_kind + @name: String + @payload: Karya::interaction_payload + @received_at: Time + + attr_reader kind: Karya::interaction_kind + attr_reader name: String + attr_reader payload: Karya::interaction_payload + attr_reader received_at: Time + + def initialize: ( + kind: Karya::interaction_kind_input, + name: state_name, + payload: Karya::interaction_payload, + received_at: Time + ) -> void + + private + + class Attributes + REQUIRED_ATTRIBUTES: Array[Symbol] + + @attributes: Hash[Symbol, interaction_snapshot_attribute_value] + + def initialize: (Hash[Symbol, interaction_snapshot_attribute_value] attributes) -> void + def kind: () -> Karya::interaction_kind + def name: () -> String + def payload: () -> Karya::interaction_payload + def received_at: () -> Time + + private + + attr_reader attributes: Hash[Symbol, interaction_snapshot_attribute_value] + def fetch: (Symbol name) -> interaction_snapshot_attribute_value + def validate_keys: () -> void + end + + class Payload + @payload: Karya::interaction_payload + + def initialize: (Karya::interaction_payload payload) -> void + def to_h: () -> Karya::interaction_payload + + private + + attr_reader payload: Karya::interaction_payload + def normalize_hash: (Karya::interaction_payload hash) -> Karya::interaction_payload + def normalize_value: (Karya::interaction_payload_value value) -> Karya::interaction_payload_value + end + end + + class QueryResult + @query: String + @value: Karya::workflow_query_value + @queried_at: Time + + attr_reader query: String + attr_reader value: Karya::workflow_query_value + attr_reader queried_at: Time + + def initialize: ( + query: Karya::state_name, + value: Karya::workflow_query_input_value, + queried_at: Time + ) -> void + + private + + class Query + @value: Karya::state_name + + def initialize: (Karya::state_name value) -> void + def to_s: () -> String + + private + + attr_reader value: Karya::state_name + end + + class Value + @query: String + @value: Karya::workflow_query_input_value + + def initialize: (String query, Karya::workflow_query_input_value value) -> void + def normalize: () -> Karya::workflow_query_value + + private + + attr_reader query: String + attr_reader value: Karya::workflow_query_input_value + def normalize_state: () -> Symbol + def normalize_current_step: () -> String? + def normalize_current_steps: () -> Array[String] + end + + class Timestamp + @name: Symbol + @value: Time + + def initialize: (Symbol name, Time value) -> void + def to_time: () -> Time + + private + + attr_reader name: Symbol + attr_reader value: Time + end + end + class StepSnapshot - @workflow_id: String - @batch_id: String - @step_id: String - @job_id: String + @identity: Identity @job: Job - @state: state_name @prerequisite_job_ids: Array[String] @prerequisite_states: Hash[String, state_name?] @child_workflow_id: String? @child_workflow: ChildWorkflowSnapshot? + @interaction: Interaction attr_reader workflow_id: String attr_reader batch_id: String attr_reader step_id: String attr_reader job_id: String attr_reader job: Job - attr_reader state: state_name attr_reader prerequisite_job_ids: Array[String] attr_reader prerequisite_states: Hash[String, state_name?] attr_reader child_workflow_id: String? attr_reader child_workflow: ChildWorkflowSnapshot? + def state: () -> state_name + def interaction_kind: () -> Karya::interaction_kind? + def interaction_name: () -> String? + def interaction_received_at: () -> Time? def initialize: ( workflow_id: state_name, @@ -319,7 +429,10 @@ module Karya prerequisite_job_ids: Array[state_name], prerequisite_states: Hash[state_name, state_name?], ?child_workflow_id: state_name?, - ?child_workflow: ChildWorkflowSnapshot? + ?child_workflow: ChildWorkflowSnapshot?, + ?interaction_kind: Karya::interaction_kind_input?, + ?interaction_name: state_name?, + ?interaction_received_at: Time? ) -> void def ready?: () -> bool def blocked?: () -> bool @@ -346,6 +459,9 @@ module Karya def prerequisite_states: () -> Hash[state_name, state_name?] def child_workflow_id: () -> String? def child_workflow: () -> ChildWorkflowSnapshot? + def interaction_kind: () -> Karya::interaction_kind? + def interaction_name: () -> String? + def interaction_received_at: () -> Time? private @@ -380,6 +496,20 @@ module Karya def validate_identity: () -> void end + class Identity + @workflow_id: String + @batch_id: String + @step_id: String + @job_id: String + + attr_reader workflow_id: String + attr_reader batch_id: String + attr_reader step_id: String + attr_reader job_id: String + + def initialize: (workflow_id: String, batch_id: String, step_id: String, job_id: String) -> void + end + class JobEntry @job_id: String @job: Job @@ -423,17 +553,88 @@ module Karya def validate_membership: (Hash[String, state_name?] normalized_states) -> void end + class Interaction + @kind: Karya::interaction_kind? + @name: String? + @received_at: Time? + + attr_reader kind: Karya::interaction_kind? + attr_reader name: String? + attr_reader received_at: Time? + + def initialize: (kind: Karya::interaction_kind?, name: String?, received_at: Time?) -> void + + private + + def validate_presence: () -> nil + def validate_timestamp_dependency: () -> nil + end + + class InteractionKindValue + @value: step_snapshot_attribute_value + + def initialize: (step_snapshot_attribute_value value) -> void + def to_sym: () -> Karya::interaction_kind? + + private + + attr_reader value: step_snapshot_attribute_value + end + + class OptionalIdentifier + @field_name: Symbol + @value: state_name? + + def initialize: (Symbol field_name, state_name? value) -> void + def to_s: () -> String? + + private + + attr_reader field_name: Symbol + attr_reader value: state_name? + end + + class OptionalTimestamp + @field_name: Symbol + @value: Time? + + def initialize: (Symbol field_name, Time? value) -> void + def to_time: () -> Time? + + private + + attr_reader field_name: Symbol + attr_reader value: Time? + end + + class Timestamp + @name: Symbol + @value: Time + + def initialize: (Symbol name, Time value) -> void + def to_time: () -> Time + + private + + attr_reader name: Symbol + attr_reader value: Time + end + private def waiting?: () -> bool def prerequisites_succeeded?: () -> bool def child_workflow_succeeded?: () -> bool + def interaction_satisfied?: () -> bool end class Snapshot @identity: Identity @membership: Membership @child_relationships: ChildRelationships + @interactions: Array[InteractionSnapshot] + @signals: Array[InteractionSnapshot] + @events: Array[InteractionSnapshot] @step_inspection: StepInspection @parent: ChildWorkflowSnapshot? @rollback: RollbackSnapshot? @@ -448,6 +649,9 @@ module Karya jobs: Array[Job], ?child_workflow_ids_by_step_id: Hash[state_name, state_name], ?child_workflows: Array[ChildWorkflowSnapshot], + ?interactions: Array[InteractionSnapshot], + ?interaction_requirements_by_job_id: Hash[state_name, Karya::interaction_requirement_input], + ?interaction_received_at_by_job_id: Hash[state_name, Time], ?parent: ChildWorkflowSnapshot?, ?rollback: RollbackSnapshot? ) -> void @@ -468,6 +672,9 @@ module Karya def child_workflows: () -> Array[ChildWorkflowSnapshot] def child_workflow: (state_name step_id) -> ChildWorkflowSnapshot? def fetch_child_workflow: (state_name step_id) -> ChildWorkflowSnapshot + def interactions: () -> Array[InteractionSnapshot] + def signals: () -> Array[InteractionSnapshot] + def events: () -> Array[InteractionSnapshot] def parent: () -> ChildWorkflowSnapshot? def rollback: () -> RollbackSnapshot? def state_counts: () -> Hash[state_name, Integer] @@ -492,6 +699,9 @@ module Karya def identity: () -> Identity def membership: () -> Membership def child_relationships: () -> ChildRelationships + def interactions: () -> Array[InteractionSnapshot] + def interaction_requirements_by_job_id: () -> Hash[String, Karya::interaction_requirement] + def interaction_received_at_by_job_id: () -> Hash[String, Time] def parent: () -> ChildWorkflowSnapshot? def rollback: () -> RollbackSnapshot? @@ -523,6 +733,17 @@ module Karya def validate_relationships: () -> void end + class InteractionList + @interactions: Array[InteractionSnapshot] + + def initialize: (Array[InteractionSnapshot] interactions) -> void + def to_a: () -> Array[InteractionSnapshot] + + private + + attr_reader interactions: Array[InteractionSnapshot] + end + class Identity @workflow_id: String @batch_id: String @@ -566,10 +787,17 @@ module Karya @identity: Identity @membership: Membership @child_relationships: ChildRelationships + @interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement] + @interaction_received_at_by_job_id: Hash[String, Time] @steps: Array[StepSnapshot] @steps_by_id: Hash[String, StepSnapshot] - def initialize: (identity: Identity, membership: Membership, child_relationships: ChildRelationships) -> void + def initialize: ( + identity: Identity, + membership: Membership, + child_relationships: ChildRelationships, + interaction_state: InteractionState + ) -> void attr_reader steps: Array[StepSnapshot] @@ -579,6 +807,8 @@ module Karya private attr_reader identity: Identity + attr_reader interaction_received_at_by_job_id: Hash[String, Time] + attr_reader interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement] attr_reader membership: Membership attr_reader child_relationships: ChildRelationships attr_reader steps_by_id: Hash[String, StepSnapshot] @@ -586,6 +816,26 @@ module Karya def prerequisite_states_for: (Array[String] prerequisite_job_ids) -> Hash[String, state_name?] end + class InteractionState + @interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement] + @interaction_received_at_by_job_id: Hash[String, Time] + @interactions: Array[InteractionSnapshot] + + attr_reader interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement] + + def initialize: ( + interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement], + interaction_received_at_by_job_id: Hash[String, Time], + interactions: Array[InteractionSnapshot] + ) -> void + def received_at_by_job_id: () -> Hash[String, Time] + + private + + attr_reader interaction_received_at_by_job_id: Hash[String, Time] + attr_reader interactions: Array[InteractionSnapshot] + end + class ChildWorkflowIds @child_workflow_ids_by_step_id: Hash[state_name, state_name] @@ -608,6 +858,121 @@ module Karya attr_reader child_workflows: Array[ChildWorkflowSnapshot] end + class InteractionRequirements + @interaction_requirements_by_job_id: Hash[state_name, Karya::interaction_requirement_input] + + def initialize: (Hash[state_name, Karya::interaction_requirement_input] interaction_requirements_by_job_id) -> void + def to_h: () -> Hash[String, Karya::interaction_requirement] + + private + + attr_reader interaction_requirements_by_job_id: Hash[state_name, Karya::interaction_requirement_input] + + class Requirement + @requirement: Karya::interaction_requirement_input + + def initialize: (Karya::interaction_requirement_input requirement) -> void + def to_h: () -> Karya::interaction_requirement + + private + + attr_reader requirement: Karya::interaction_requirement_input + end + end + + class InteractionReceivedAtByJobId + @interaction_received_at_by_job_id: Hash[state_name, Time] + + def initialize: (Hash[state_name, Time] interaction_received_at_by_job_id) -> void + def to_h: () -> Hash[String, Time] + + private + + attr_reader interaction_received_at_by_job_id: Hash[state_name, Time] + end + + class InteractionDeliveries + @interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement] + @interactions: Array[InteractionSnapshot] + + def initialize: ( + interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement], + interactions: Array[InteractionSnapshot] + ) -> void + def to_h: () -> Hash[String, Time] + + private + + attr_reader interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement] + attr_reader interactions: Array[InteractionSnapshot] + + class DeliveryIndex + @interactions: Array[InteractionSnapshot] + + def initialize: (Array[InteractionSnapshot] interactions) -> void + def to_h: () -> Hash[[Karya::interaction_kind, String], Time] + + private + + attr_reader interactions: Array[InteractionSnapshot] + + class DeliveryEntry + @interaction: InteractionSnapshot + + def initialize: (InteractionSnapshot interaction) -> void + def to_pair: () -> [[Karya::interaction_kind, String], Time] + + private + + attr_reader interaction: InteractionSnapshot + end + end + + class MatchingJobIndex + @interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement] + @index: Hash[[Karya::interaction_kind, String], Array[String]] + + def initialize: (Hash[String, Karya::interaction_requirement] interaction_requirements_by_job_id) -> void + def to_h: () -> Hash[[Karya::interaction_kind, String], Array[String]] + + private + + attr_reader index: Hash[[Karya::interaction_kind, String], Array[String]] + attr_reader interaction_requirements_by_job_id: Hash[String, Karya::interaction_requirement] + def register: ([Karya::interaction_kind, String] interaction_key, String job_id) -> ([String] | Array[String]) + + class RequirementKey + @requirement: Karya::interaction_requirement + + def initialize: (Karya::interaction_requirement requirement) -> void + def to_a: () -> [Karya::interaction_kind, String] + + private + + attr_reader requirement: Karya::interaction_requirement + end + end + + class ReceivedAtByJobId + @delivery_index: Hash[[Karya::interaction_kind, String], Time] + @matching_job_index: Hash[[Karya::interaction_kind, String], Array[String]] + @received_at_by_job_id: Hash[String, Time] + + def initialize: ( + delivery_index: Hash[[Karya::interaction_kind, String], Time], + matching_job_index: Hash[[Karya::interaction_kind, String], Array[String]] + ) -> void + def to_h: () -> Hash[String, Time] + + private + + attr_reader delivery_index: Hash[[Karya::interaction_kind, String], Time] + attr_reader matching_job_index: Hash[[Karya::interaction_kind, String], Array[String]] + attr_reader received_at_by_job_id: Hash[String, Time] + def register: ([Karya::interaction_kind, String] interaction_key, Time received_at) -> Array[String] + end + end + class SummaryData @state_counts: Hash[state_name, Integer] @total_count: Integer @@ -621,7 +986,7 @@ module Karya attr_reader failed_count: Integer attr_reader state: Karya::workflow_state - def initialize: (Membership membership, ChildRelationships child_relationships) -> void + def initialize: (Membership membership, StepInspection step_inspection) -> void end class Timestamp @@ -708,35 +1073,21 @@ module Karya class State @jobs: Array[Job] - @step_job_ids: Hash[String, String] - @dependency_job_ids_by_job_id: Hash[String, Array[String]] - @child_relationships: ChildRelationships - @jobs_by_id: Hash[String, Job] - @step_id_by_job_id: Hash[String, String] + @steps: Array[StepSnapshot] - def initialize: ( - jobs: Array[Job], - step_job_ids: Hash[String, String], - dependency_job_ids_by_job_id: Hash[String, Array[String]], - child_relationships: ChildRelationships - ) -> void + def initialize: (jobs: Array[Job], steps: Array[StepSnapshot]) -> void def to_sym: () -> Karya::workflow_state private attr_reader jobs: Array[Job] - attr_reader dependency_job_ids_by_job_id: Hash[String, Array[String]] - attr_reader child_relationships: ChildRelationships - attr_reader jobs_by_id: Hash[String, Job] - attr_reader step_id_by_job_id: Hash[String, String] + attr_reader steps: Array[StepSnapshot] def failed?: () -> bool def only_state?: (state_name state) -> bool def terminal_mixed?: () -> bool def running?: () -> bool def progressed?: () -> bool def blocked?: () -> bool - def dependency_blocked?: (Job job) -> bool - def child_workflow_blocked?: (Job job) -> bool end end @@ -758,6 +1109,8 @@ module Karya @child_workflow: String? @compensate_with: String? @compensation_arguments: Hash[String, job_argument] + @wait_for_signal: String? + @wait_for_event: String? attr_reader id: String attr_reader handler: String @@ -766,6 +1119,8 @@ module Karya attr_reader child_workflow: String? attr_reader compensate_with: String? attr_reader compensation_arguments: Hash[String, job_argument] + attr_reader wait_for_signal: String? + attr_reader wait_for_event: String? def initialize: ( id: state_name, @@ -774,7 +1129,9 @@ module Karya ?depends_on: (state_name | Array[state_name] | nil), ?compensate_with: state_name?, ?compensation_arguments: Hash[state_name, job_argument], - ?child_workflow: state_name? + ?child_workflow: state_name?, + ?wait_for_signal: state_name?, + ?wait_for_event: state_name? ) -> void def compensable?: () -> bool def child_workflow?: () -> bool @@ -812,11 +1169,16 @@ module Karya def compensate_with: () -> state_name? def child_workflow: () -> state_name? def compensation_arguments: () -> Hash[state_name, job_argument] + def wait_for_signal: () -> state_name? + def wait_for_event: () -> state_name? private attr_reader options: Hash[Symbol, Hash[state_name, job_argument] | (state_name | Array[state_name] | nil)] def validate_keys: () -> void + def unexpected_keys: () -> Array[Symbol] + def unknown_keyword_label: () -> String + def formatted_unexpected_keys: () -> String end class Dependencies @@ -853,6 +1215,21 @@ module Karya attr_reader value: state_name? end + class InteractionName + @field_name: Symbol + @value: state_name? + + def initialize: (Symbol field_name, state_name? value) -> void + def normalize: () -> String? + + private + + attr_reader field_name: Symbol + attr_reader value: state_name? + end + + def validate_compensation_configuration: () -> nil + def validate_interaction_configuration: () -> nil def compensation_handler_label: () -> String end @@ -1057,7 +1434,9 @@ module Karya ?depends_on: (state_name | Array[state_name] | nil), ?compensate_with: state_name?, ?compensation_arguments: Hash[state_name, job_argument], - ?child_workflow: state_name? + ?child_workflow: state_name?, + ?wait_for_signal: state_name?, + ?wait_for_event: state_name? ) -> nil def to_definition: () -> Definition diff --git a/core/karya/spec/karya/queue_store/bulk_mutation_report_spec.rb b/core/karya/spec/karya/queue_store/bulk_mutation_report_spec.rb index bb51a55..0cf8ade 100644 --- a/core/karya/spec/karya/queue_store/bulk_mutation_report_spec.rb +++ b/core/karya/spec/karya/queue_store/bulk_mutation_report_spec.rb @@ -67,6 +67,8 @@ def build_report(**overrides) it 'accepts workflow step control actions' do actions = %i[ + deliver_workflow_signal + deliver_workflow_event enqueue_child_workflow retry_workflow_steps dead_letter_workflow_steps diff --git a/core/karya/spec/karya/queue_store/in_memory/internal/store_state_spec.rb b/core/karya/spec/karya/queue_store/in_memory/internal/store_state_spec.rb index 4619bde..ecd532f 100644 --- a/core/karya/spec/karya/queue_store/in_memory/internal/store_state_spec.rb +++ b/core/karya/spec/karya/queue_store/in_memory/internal/store_state_spec.rb @@ -31,6 +31,10 @@ def rollback_batch_id(batch_id) workflow_support.const_get(:RollbackBatchId, false).new(batch_id).to_s end + def interaction_snapshot(kind: :signal, name: :manager_approved, payload: {}) + Karya::Workflow::InteractionSnapshot.new(kind:, name:, payload:, received_at: created_at) + end + it 'ignores execution tokens that are not present' do store_state.execution_tokens_in_order << 'lease-1' @@ -198,6 +202,7 @@ def rollback_batch_id(batch_id) step_job_ids = { 'root' => 'job-root' } dependency_job_ids = [] dependency_job_ids_by_job_id = { 'job-root' => dependency_job_ids } + interaction_requirements_by_job_id = { 'job-root' => { kind: :signal, name: 'manager_approved' }.freeze } compensation_jobs_by_step_id = { 'root' => instance_double(Karya::Job) } registration = store_state.register_workflow( @@ -205,26 +210,171 @@ def rollback_batch_id(batch_id) workflow_id: 'invoice_closeout', step_job_ids:, dependency_job_ids_by_job_id:, + interaction_requirements_by_job_id:, compensation_jobs_by_step_id: ) step_job_ids['root'] = 'mutated' dependency_job_ids << 'mutated' dependency_job_ids_by_job_id['job-root'] = ['mutated'] + interaction_requirements_by_job_id['job-root'] = { kind: :event, name: 'payment_received' } compensation_jobs_by_step_id['root'] = instance_double(Karya::Job) expect(registration.workflow_id).to eq('invoice_closeout') expect(registration.step_job_ids).to eq('root' => 'job-root') expect(registration.dependency_job_ids_by_job_id).to eq('job-root' => []) + expect(registration.interaction_requirements_by_job_id).to eq('job-root' => { kind: :signal, name: 'manager_approved' }) expect(registration.compensation_jobs_by_step_id.keys).to eq(['root']) expect(registration.step_job_ids).to be_frozen expect(registration.dependency_job_ids_by_job_id).to be_frozen expect(registration.dependency_job_ids_by_job_id.fetch('job-root')).to be_frozen + expect(registration.interaction_requirements_by_job_id).to be_frozen + expect(registration.interaction_requirements_by_job_id.fetch('job-root')).to be_frozen expect(registration.compensation_jobs_by_step_id).to be_frozen expect(registration).to be_frozen expect(store_state.workflow_registrations_by_batch_id.fetch('batch-1')).to eq(registration) expect(store_state.workflow_registrations_by_batch_id['missing']).to be_nil end + it 'stores workflow interactions by batch id' do + signal = interaction_snapshot(kind: :signal, name: :manager_approved) + event = interaction_snapshot(kind: :event, name: :payment_received) + updated_signal = interaction_snapshot(kind: :signal, name: :manager_approved) + + store_state.register_workflow_interaction(batch_id: 'batch-1', interaction: signal) + store_state.register_workflow_interaction(batch_id: 'batch-1', interaction: event) + store_state.register_workflow_interaction(batch_id: 'batch-1', interaction: updated_signal) + + expect(store_state.workflow_interactions_for('batch-1')).to eq([signal, event, updated_signal]) + expect(store_state.workflow_interactions_for('missing')).to eq([]) + end + + it 'reuses the same frozen interaction snapshot array until the inbox changes' do + signal = interaction_snapshot(kind: :signal, name: :manager_approved) + + store_state.register_workflow_interaction(batch_id: 'batch-1', interaction: signal) + + interactions = store_state.workflow_interactions_for('batch-1') + + expect(store_state.workflow_interactions_for('batch-1')).to equal(interactions) + expect(interactions).to be_frozen + end + + it 'retains only the latest bounded workflow interactions per batch' do + max = described_class.send(:const_get, :WorkflowInteractions).send(:const_get, :MAX_INTERACTIONS_PER_BATCH) + + (max + 1).times do |index| + store_state.register_workflow_interaction( + batch_id: 'batch-1', + interaction: interaction_snapshot( + kind: :signal, + name: :manager_approved, + payload: { 'attempt' => index } + ) + ) + end + + expect(store_state.workflow_interactions_for('batch-1').length).to eq(max) + expect(store_state.workflow_interactions_for('batch-1').map { |interaction| interaction.payload.fetch('attempt') }).to eq((1..max).to_a) + end + + it 'keeps delivered interaction satisfaction across rolling retention' do + max = described_class.send(:const_get, :WorkflowInteractions).send(:const_get, :MAX_INTERACTIONS_PER_BATCH) + store_state.workflow_interactions.configure( + batch_id: 'batch-1', + supported_keys: { + [:signal, 'manager_approved'] => true, + [:event, "payment_received_#{max - 1}"] => true + } + ) + + store_state.register_workflow_interaction( + batch_id: 'batch-1', + interaction: interaction_snapshot(kind: :signal, name: :manager_approved) + ) + + max.times do |index| + store_state.register_workflow_interaction( + batch_id: 'batch-1', + interaction: interaction_snapshot(kind: :event, name: "payment_received_#{index}") + ) + end + + expect(store_state.workflow_interaction_delivered?(batch_id: 'batch-1', kind: :signal, name: 'manager_approved')).to be(true) + expect(store_state.workflow_interaction_delivered?(batch_id: 'batch-1', kind: :event, name: "payment_received_#{max - 1}")).to be(true) + end + + it 'tracks the latest received_at for delivered interactions independently of retained history' do + max = described_class.send(:const_get, :WorkflowInteractions).send(:const_get, :MAX_INTERACTIONS_PER_BATCH) + store_state.workflow_interactions.configure( + batch_id: 'batch-1', + supported_keys: { + [:signal, 'manager_approved'] => true, + [:event, "payment_received_#{max - 1}"] => true + } + ) + + store_state.register_workflow_interaction( + batch_id: 'batch-1', + interaction: interaction_snapshot(kind: :signal, name: :manager_approved) + ) + + max.times do |index| + store_state.register_workflow_interaction( + batch_id: 'batch-1', + interaction: Karya::Workflow::InteractionSnapshot.new( + kind: :event, + name: "payment_received_#{index}", + payload: {}, + received_at: created_at + index + 1 + ) + ) + end + + expect(store_state.workflow_interaction_received_at(batch_id: 'batch-1', kind: :signal, name: 'manager_approved')).to eq(created_at) + expect(store_state.workflow_interaction_received_at(batch_id: 'batch-1', kind: :event, name: "payment_received_#{max - 1}")).to eq(created_at + max) + end + + it 'bounds readiness tracking to declared workflow interaction requirements' do + store_state.register_workflow( + batch_id: 'batch-1', + workflow_id: 'invoice_closeout', + step_job_ids: { 'approve' => 'job-1' }, + dependency_job_ids_by_job_id: { 'job-1' => [] }, + interaction_requirements_by_job_id: { 'job-1' => { kind: :signal, name: 'manager_approved' } }, + compensation_jobs_by_step_id: {} + ) + + store_state.register_workflow_interaction( + batch_id: 'batch-1', + interaction: interaction_snapshot(kind: :signal, name: :manager_approved) + ) + store_state.register_workflow_interaction( + batch_id: 'batch-1', + interaction: interaction_snapshot(kind: :event, name: :payment_received) + ) + + inbox = store_state.workflow_interactions.instance_variable_get(:@by_batch_id).fetch('batch-1') + received_at_by_key = inbox.instance_variable_get(:@received_at_by_key) + + expect(received_at_by_key.keys).to eq([[:signal, 'manager_approved']]) + expect(store_state.workflow_interaction_received_at(batch_id: 'batch-1', kind: :event, name: 'payment_received')).to be_nil + end + + it 'accepts supported interaction keys as an enumerable during inbox configuration' do + store_state.workflow_interactions.configure( + batch_id: 'batch-1', + supported_keys: [[:signal, 'manager_approved']] + ) + + store_state.register_workflow_interaction( + batch_id: 'batch-1', + interaction: interaction_snapshot(kind: :signal, name: :manager_approved) + ) + + expect(store_state.workflow_interaction_delivered?(batch_id: 'batch-1', kind: :signal, name: 'manager_approved')).to be(true) + expect(store_state.workflow_interaction_received_at(batch_id: 'batch-1', kind: :signal, name: 'manager_approved')).to eq(created_at) + end + it 'cleans up child workflow relationships by parent batch' do workflow_children = store_state.workflow_children @@ -254,6 +404,22 @@ def rollback_batch_id(batch_id) expect(workflow_children.expected_child_workflow_id_by_job_id).to eq({}) end + it 'prunes workflow interaction inboxes with workflow batch cleanup' do + store_state.jobs_by_id['job-1'] = succeeded_job('job-1') + store_state.register_batch(batch('batch-1', ['job-1'])) + store_state.register_workflow( + batch_id: 'batch-1', + workflow_id: 'invoice_closeout', + step_job_ids: { 'root' => 'job-1' }, + dependency_job_ids_by_job_id: { 'job-1' => [] }, + compensation_jobs_by_step_id: {} + ) + store_state.register_workflow_interaction(batch_id: 'batch-1', interaction: interaction_snapshot) + + expect(store_state.prune_terminal_batches(0)).to eq(['batch-1']) + expect(store_state.workflow_interactions_for('batch-1')).to eq([]) + end + it 'cleans up child workflow relationships by child batch' do workflow_children = store_state.workflow_children diff --git a/core/karya/spec/karya/queue_store/in_memory/internal/workflow_query_spec.rb b/core/karya/spec/karya/queue_store/in_memory/internal/workflow_query_spec.rb new file mode 100644 index 0000000..3fd15a5 --- /dev/null +++ b/core/karya/spec/karya/queue_store/in_memory/internal/workflow_query_spec.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +RSpec.describe 'Karya::QueueStore::InMemory::Internal::WorkflowSupport::WorkflowQuery' do + let(:queried_at) { Time.utc(2026, 4, 26, 12, 0, 0) } + let(:described_class) do + internal = Karya::QueueStore::InMemory.const_get(:Internal, false) + workflow_support = internal.const_get(:WorkflowSupport, false) + workflow_support.const_get(:WorkflowQuery, false) + end + + def snapshot(steps) + instance_double(Karya::Workflow::Snapshot, steps:, state: :running) + end + + def step(step_id, active: false, ready: false, blocked: false, prerequisite_states: {}) + instance_double( + Karya::Workflow::StepSnapshot, + step_id:, + active?: active, + ready?: ready, + blocked?: blocked, + prerequisite_states: + ) + end + + it 'prefers active steps over ready queued work' do + result = described_class.new( + snapshot: snapshot([step('root', active: true), step('child', ready: true)]), + query: 'current-steps', + queried_at: + ).to_result + + expect(result).to have_attributes(query: 'current-steps', value: ['root']) + end + + it 'falls back to blocked steps when no active or ready work exists' do + result = described_class.new( + snapshot: snapshot([step('approve', blocked: true)]), + query: 'current-step', + queried_at: + ).to_result + + expect(result).to have_attributes(query: 'current-step', value: 'approve') + end + + it 'excludes dependency-blocked descendants from current blocked steps' do + result = described_class.new( + snapshot: snapshot( + [ + step('approve', blocked: true), + step('capture_payment', blocked: true, prerequisite_states: { 'job-approve' => :queued }) + ] + ), + query: 'current-steps', + queried_at: + ).to_result + + expect(result).to have_attributes(query: 'current-steps', value: ['approve']) + end + + it 'rejects unsupported queries' do + expect do + described_class.new(snapshot: snapshot([]), query: 'unsupported', queried_at:).to_result + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'unsupported workflow query "unsupported"') + + query = described_class.new(snapshot: snapshot([]), query: 'state', queried_at:) + allow(query).to receive(:normalized_query).and_return('unsupported') + + expect do + query.to_result + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'unsupported workflow query "unsupported"') + end +end diff --git a/core/karya/spec/karya/queue_store/in_memory_workflow_spec.rb b/core/karya/spec/karya/queue_store/in_memory_workflow_spec.rb index 5d12837..76bc15a 100644 --- a/core/karya/spec/karya/queue_store/in_memory_workflow_spec.rb +++ b/core/karya/spec/karya/queue_store/in_memory_workflow_spec.rb @@ -401,13 +401,359 @@ def run_successfully(reservation, start_offset:, complete_offset:) store.workflow_snapshot(batch_id: :missing, now: created_at + 1) end.to raise_error(Karya::Workflow::UnknownBatchError, 'batch "missing" is not registered') - store.enqueue_many(jobs: [workflow_job(:root)], batch_id: :plain_batch, now: created_at + 2) + store.enqueue_many( + jobs: [ + Karya::Job.new( + id: 'job-plain_root', + queue: :plain, + handler: :plain_root, + arguments: {}, + priority: 0, + state: :submission, + created_at:, + scheduled_at: created_at, + updated_at: created_at + ) + ], + batch_id: :plain_batch, + now: created_at + 2 + ) expect do store.workflow_snapshot(batch_id: :plain_batch, now: created_at + 3) end.to raise_error(Karya::Workflow::InvalidExecutionError, 'batch "plain_batch" is not a workflow batch') end + it 'delivers supported workflow signals and external events into snapshot inspection' do + definition = Karya::Workflow.define(:interactive) do + step :approve, handler: :approve, wait_for_signal: :manager_approved + step :capture_payment, handler: :capture_payment, wait_for_event: :payment_received + end + store.enqueue_workflow( + definition:, + jobs_by_step_id: { + approve: workflow_job(:approve), + capture_payment: workflow_job(:capture_payment) + }, + batch_id: :batch_one, + now: created_at + 1 + ) + + signal_report = store.deliver_workflow_signal( + batch_id: :batch_one, + signal: :manager_approved, + payload: { 'approved_by' => 'ops' }, + now: created_at + 2 + ) + event_report = store.deliver_workflow_event( + batch_id: :batch_one, + event: :payment_received, + payload: { 'source' => 'stripe' }, + now: created_at + 3 + ) + + snapshot = store.workflow_snapshot(batch_id: :batch_one, now: created_at + 4) + + expect(signal_report).to have_attributes(action: :deliver_workflow_signal, requested_job_ids: [], changed_jobs: [], skipped_jobs: []) + expect(event_report.action).to eq(:deliver_workflow_event) + expect(snapshot.interactions.map(&:name)).to eq(%w[manager_approved payment_received]) + expect(snapshot.signals.map(&:name)).to eq(['manager_approved']) + expect(snapshot.events.map(&:name)).to eq(['payment_received']) + end + + it 'preserves repeated deliveries of the same interaction in snapshot inspection' do + definition = Karya::Workflow.define(:interactive) do + step :approve, handler: :approve, wait_for_signal: :manager_approved + end + store.enqueue_workflow( + definition:, + jobs_by_step_id: { approve: workflow_job(:approve) }, + batch_id: :batch_one, + now: created_at + 1 + ) + + store.deliver_workflow_signal( + batch_id: :batch_one, + signal: :manager_approved, + payload: { 'approved_by' => 'ops-1' }, + now: created_at + 2 + ) + store.deliver_workflow_signal( + batch_id: :batch_one, + signal: :manager_approved, + payload: { 'approved_by' => 'ops-2' }, + now: created_at + 3 + ) + + snapshot = store.workflow_snapshot(batch_id: :batch_one, now: created_at + 4) + + expect(snapshot.interactions.map(&:payload)).to eq( + [ + { 'approved_by' => 'ops-1' }, + { 'approved_by' => 'ops-2' } + ] + ) + expect(snapshot.signals.map(&:received_at)).to eq([created_at + 2, created_at + 3]) + end + + it 'retains only the latest bounded interaction deliveries per workflow batch' do + definition = Karya::Workflow.define(:interactive) do + step :approve, handler: :approve, wait_for_signal: :manager_approved + end + max = 100 + store.enqueue_workflow( + definition:, + jobs_by_step_id: { approve: workflow_job(:approve) }, + batch_id: :batch_one, + now: created_at + 1 + ) + + (max + 1).times do |index| + store.deliver_workflow_signal( + batch_id: :batch_one, + signal: :manager_approved, + payload: { 'attempt' => index }, + now: created_at + 2 + index + ) + end + + snapshot = store.workflow_snapshot(batch_id: :batch_one, now: created_at + max + 3) + + expect(snapshot.signals.length).to eq(max) + expect(snapshot.signals.map { |interaction| interaction.payload.fetch('attempt') }).to eq((1..max).to_a) + end + + it 'keeps waiting-step interaction satisfaction after old deliveries roll out of inspection history' do + max = 100 + definition = Karya::Workflow.define(:interactive) do + step :approve, handler: :approve, wait_for_signal: :manager_approved + max.times do |index| + step :"noise_#{index}", handler: :"noise_#{index}", wait_for_signal: :"noise_#{index}" + end + end + store.enqueue_workflow( + definition:, + jobs_by_step_id: { approve: workflow_job(:approve) }.merge( + max.times.to_h do |index| + step_id = :"noise_#{index}" + [step_id, workflow_job(step_id, handler: step_id)] + end + ), + batch_id: :batch_one, + now: created_at + 1 + ) + store.deliver_workflow_signal( + batch_id: :batch_one, + signal: :manager_approved, + payload: { 'approved_by' => 'ops' }, + now: created_at + 2 + ) + + max.times do |index| + store.deliver_workflow_signal( + batch_id: :batch_one, + signal: :"noise_#{index}", + payload: { 'attempt' => index }, + now: created_at + 3 + index + ) + end + + snapshot = store.workflow_snapshot(batch_id: :batch_one, now: created_at + max + 4) + + expect(snapshot.fetch_step(:approve)).to be_ready + end + + it 'rejects workflow interaction delivery for unknown, non-workflow, unsupported, and terminal batches' do + definition = Karya::Workflow.define(:interactive) do + step :approve, handler: :approve, wait_for_signal: :manager_approved + end + + expect do + store.deliver_workflow_signal(batch_id: :missing, signal: :manager_approved, payload: {}, now: created_at + 1) + end.to raise_error(Karya::Workflow::UnknownBatchError, 'batch "missing" is not registered') + + store.enqueue_many( + jobs: [ + Karya::Job.new( + id: 'job-plain_root', + queue: :plain, + handler: :plain_root, + arguments: {}, + priority: 0, + state: :submission, + created_at:, + scheduled_at: created_at, + updated_at: created_at + ) + ], + batch_id: :plain_batch, + now: created_at + 2 + ) + expect do + store.deliver_workflow_event(batch_id: :plain_batch, event: :payment_received, payload: {}, now: created_at + 3) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'batch "plain_batch" is not a workflow batch') + + store.enqueue_workflow( + definition:, + jobs_by_step_id: { approve: workflow_job(:approve, handler: :approve) }, + batch_id: :batch_one, + now: created_at + 4 + ) + expect do + store.deliver_workflow_event(batch_id: :batch_one, event: :payment_received, payload: {}, now: created_at + 5) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'workflow batch "batch_one" does not support event "payment_received"') + + store.deliver_workflow_signal(batch_id: :batch_one, signal: :manager_approved, payload: {}, now: created_at + 6) + root = reserve(7) + run_successfully(root, start_offset: 8, complete_offset: 9) + + expect do + store.deliver_workflow_signal(batch_id: :batch_one, signal: :manager_approved, payload: {}, now: created_at + 8) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'workflow batch "batch_one" is terminal and cannot receive interactions') + end + + it 'rejects workflow interaction payloads that are not string-keyed JSON-compatible hashes' do + definition = Karya::Workflow.define(:interactive) do + step :approve, handler: :approve, wait_for_signal: :manager_approved + step :capture_payment, handler: :capture_payment, wait_for_event: :payment_received + end + store.enqueue_workflow( + definition:, + jobs_by_step_id: { + approve: workflow_job(:approve), + capture_payment: workflow_job(:capture_payment) + }, + batch_id: :batch_one, + now: created_at + 1 + ) + + expect do + store.deliver_workflow_signal(batch_id: :batch_one, signal: :manager_approved, payload: { approved_by: 'ops' }, now: created_at + 2) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'payload keys must be Strings') + expect do + store.deliver_workflow_event(batch_id: :batch_one, event: :payment_received, payload: { 'received_at' => created_at }, now: created_at + 3) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'payload values must be JSON-compatible') + expect do + store.deliver_workflow_signal( + batch_id: :batch_one, + signal: :manager_approved, + payload: { 'message' => 'x' * (16 * 1024) }, + now: created_at + 4 + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'payload exceeds 16384 bytes') + end + + it 'supports explicit workflow queries for state and current steps' do + definition = Karya::Workflow.define(:interactive) do + step :root, handler: :root + step :capture_payment, handler: :capture_payment, depends_on: :root + step :emit_receipt, handler: :emit_receipt, depends_on: :root + end + store.enqueue_workflow( + definition:, + jobs_by_step_id: { + root: workflow_job(:root), + capture_payment: workflow_job(:capture_payment), + emit_receipt: workflow_job(:emit_receipt) + }, + batch_id: :batch_one, + now: created_at + 1 + ) + + initial_state = store.query_workflow(batch_id: :batch_one, query: :state, now: created_at + 2) + initial_step = store.query_workflow(batch_id: :batch_one, query: 'current-step', now: created_at + 3) + run_successfully(reserve(4), start_offset: 5, complete_offset: 6) + current_steps = store.query_workflow(batch_id: :batch_one, query: 'current-steps', now: created_at + 7) + + expect(initial_state).to have_attributes(query: 'state', value: :blocked) + expect(initial_step).to have_attributes(query: 'current-step', value: 'root') + expect(current_steps).to have_attributes(query: 'current-steps', value: %w[capture_payment emit_receipt]) + end + + it 'lets signals and events drive workflow readiness transitions through waiting steps' do + signal_definition = Karya::Workflow.define(:signal_gate) do + step :approve, handler: :approve, wait_for_signal: :manager_approved + end + event_definition = Karya::Workflow.define(:event_gate) do + step :capture_payment, handler: :capture_payment, wait_for_event: :payment_received + end + + store.enqueue_workflow( + definition: signal_definition, + jobs_by_step_id: { approve: workflow_job(:approve) }, + batch_id: :signal_batch, + now: created_at + 1 + ) + store.enqueue_workflow( + definition: event_definition, + jobs_by_step_id: { capture_payment: workflow_job(:capture_payment) }, + batch_id: :event_batch, + now: created_at + 2 + ) + + expect(reserve(3)).to be_nil + + blocked_signal = store.workflow_snapshot(batch_id: :signal_batch, now: created_at + 4) + blocked_event = store.workflow_snapshot(batch_id: :event_batch, now: created_at + 5) + + store.deliver_workflow_signal( + batch_id: :signal_batch, + signal: :manager_approved, + payload: { 'approved_by' => 'ops' }, + now: created_at + 6 + ) + store.deliver_workflow_event( + batch_id: :event_batch, + event: :payment_received, + payload: { 'source' => 'stripe' }, + now: created_at + 7 + ) + + signal_ready = store.workflow_snapshot(batch_id: :signal_batch, now: created_at + 8) + event_ready = store.workflow_snapshot(batch_id: :event_batch, now: created_at + 9) + + expect(blocked_signal.fetch_step(:approve)).to be_blocked + expect(blocked_event.fetch_step(:capture_payment)).to be_blocked + expect(signal_ready.fetch_step(:approve)).to be_ready + expect(event_ready.fetch_step(:capture_payment)).to be_ready + expect(reserve(10).job_id).to eq('job-approve') + expect(reserve(11).job_id).to eq('job-capture_payment') + end + + it 'keeps workflow interactions visible across retry and rollback controls' do + definition = Karya::Workflow.define(:interactive) do + step :approve, handler: :approve, wait_for_signal: :manager_approved + end + store.enqueue_workflow( + definition:, + jobs_by_step_id: { approve: workflow_job(:approve) }, + batch_id: :batch_one, + now: created_at + 1 + ) + store.deliver_workflow_signal( + batch_id: :batch_one, + signal: :manager_approved, + payload: { 'approved_by' => 'ops' }, + now: created_at + 2 + ) + root = reserve(3) + store.start_execution(reservation_token: root.token, now: created_at + 4) + store.fail_execution(reservation_token: root.token, now: created_at + 5, failure_classification: :error) + + retry_report = store.retry_workflow_steps(batch_id: :batch_one, step_ids: [:approve], now: created_at + 6) + retry_snapshot = store.workflow_snapshot(batch_id: :batch_one, now: created_at + 7) + retried_root = reserve(8) + store.start_execution(reservation_token: retried_root.token, now: created_at + 9) + store.fail_execution(reservation_token: retried_root.token, now: created_at + 10, failure_classification: :error) + rollback_report = store.rollback_workflow(batch_id: :batch_one, now: created_at + 11, reason: 'operator rollback') + rollback_snapshot = store.workflow_snapshot(batch_id: :batch_one, now: created_at + 12) + + expect(retry_report.changed_jobs.map(&:id)).to eq(['job-approve']) + expect(retry_snapshot.interactions.map(&:name)).to eq(['manager_approved']) + expect(rollback_report.action).to eq(:rollback_workflow) + expect(rollback_snapshot.interactions.map(&:name)).to eq(['manager_approved']) + end + it 'retries explicit failed and retry-pending workflow steps without bypassing dependency gates' do definition = Karya::Workflow.define(:workflow_retry_controls) do step :root, handler: :root diff --git a/core/karya/spec/karya/queue_store_base_spec.rb b/core/karya/spec/karya/queue_store_base_spec.rb index a553585..e9fa6dc 100644 --- a/core/karya/spec/karya/queue_store_base_spec.rb +++ b/core/karya/spec/karya/queue_store_base_spec.rb @@ -49,6 +49,34 @@ end.to raise_error(NotImplementedError, /implement #workflow_snapshot/) end + it 'requires query_workflow to be implemented' do + expect do + store.query_workflow(batch_id: 'batch-1', query: 'state', now: Time.utc(2026, 3, 27, 12, 0, 0)) + end.to raise_error(NotImplementedError, /implement #query_workflow/) + end + + it 'requires deliver_workflow_signal to be implemented' do + expect do + store.deliver_workflow_signal( + batch_id: 'batch-1', + signal: 'manager-approved', + payload: { 'approved_by' => 'ops' }, + now: Time.utc(2026, 3, 27, 12, 0, 0) + ) + end.to raise_error(NotImplementedError, /implement #deliver_workflow_signal/) + end + + it 'requires deliver_workflow_event to be implemented' do + expect do + store.deliver_workflow_event( + batch_id: 'batch-1', + event: 'payment-received', + payload: { 'source' => 'stripe' }, + now: Time.utc(2026, 3, 27, 12, 0, 0) + ) + end.to raise_error(NotImplementedError, /implement #deliver_workflow_event/) + end + it 'requires enqueue_child_workflow to be implemented' do expect do store.enqueue_child_workflow( diff --git a/core/karya/spec/karya/workflow/interaction_snapshot_spec.rb b/core/karya/spec/karya/workflow/interaction_snapshot_spec.rb new file mode 100644 index 0000000..413330b --- /dev/null +++ b/core/karya/spec/karya/workflow/interaction_snapshot_spec.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +RSpec.describe Karya::Workflow::InteractionSnapshot do + let(:received_at) { Time.utc(2026, 4, 26, 12, 0, 0) } + + it 'normalizes, deep-freezes, and exposes interaction data' do + mutable_string = +'ops' + snapshot = described_class.new( + kind: 'signal', + name: ' manager-approved ', + payload: { + 'approved_by' => mutable_string, + 'attempts' => [1, 2], + 'metadata' => { 'source' => 'console' } + }, + received_at: + ) + mutable_string.replace('changed') + + expect(snapshot).to have_attributes( + kind: :signal, + name: 'manager-approved', + payload: { + 'approved_by' => 'ops', + 'attempts' => [1, 2], + 'metadata' => { 'source' => 'console' } + }, + received_at: + ) + expect(snapshot).to be_frozen + expect(snapshot.payload).to be_frozen + expect(snapshot.payload.fetch('approved_by')).to be_frozen + expect(snapshot.payload.fetch('attempts')).to be_frozen + expect(snapshot.payload.fetch('metadata')).to be_frozen + end + + it 'rejects invalid kinds and payload shapes' do + expect do + described_class.new(kind: :unknown, name: :signal, payload: {}, received_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'kind must be :signal or :event') + expect do + described_class.new(kind: 123, name: :signal, payload: {}, received_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'kind must be :signal or :event') + expect do + described_class.new(kind: :signal, name: :signal, payload: {}, received_at: '2026-04-26T12:00:00Z') + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'received_at must be a Time') + expect do + described_class.new(kind: :signal, name: :signal, payload: 'payload', received_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'payload must be a Hash') + expect do + described_class.new(kind: :signal, name: :signal, payload: { source: 'console' }, received_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'payload keys must be Strings') + expect do + described_class.new(kind: :signal, name: :signal, payload: { 'received_at' => Time.now }, received_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'payload values must be JSON-compatible') + invalid_string = +"\xFF" + invalid_string.force_encoding(Encoding::UTF_8) + expect do + described_class.new(kind: :signal, name: :signal, payload: { 'message' => invalid_string }, received_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'payload must be JSON-encodable') + expect do + described_class.new(kind: :signal, name: :signal, payload: { 'message' => 'x' * (16 * 1024) }, received_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'payload exceeds 16384 bytes') + expect do + described_class.new(kind: :signal, name: ' ', payload: {}, received_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'name must be present') + end + + it 'rejects unknown attributes' do + expect do + described_class.new( + kind: :event, + name: :payment_received, + payload: {}, + received_at:, + unexpected: true + ) + end.to raise_error(ArgumentError, 'unknown keyword: :unexpected') + end +end diff --git a/core/karya/spec/karya/workflow/query_result_spec.rb b/core/karya/spec/karya/workflow/query_result_spec.rb new file mode 100644 index 0000000..5ae8393 --- /dev/null +++ b/core/karya/spec/karya/workflow/query_result_spec.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +RSpec.describe Karya::Workflow::QueryResult do + let(:queried_at) { Time.utc(2026, 4, 26, 12, 0, 0) } + + it 'normalizes supported workflow queries' do + state = described_class.new(query: ' state ', value: :running, queried_at:) + current_step = described_class.new(query: :'current-step', value: ' capture_payment ', queried_at:) + current_steps = described_class.new(query: 'current-steps', value: %i[capture_payment emit_receipt], queried_at:) + + expect(state).to have_attributes(query: 'state', value: :running, queried_at:) + expect(current_step).to have_attributes(query: 'current-step', value: 'capture_payment', queried_at:) + expect(current_steps).to have_attributes(query: 'current-steps', value: %w[capture_payment emit_receipt], queried_at:) + expect(current_steps.value).to be_frozen + end + + it 'allows current-step queries to return nil' do + result = described_class.new(query: 'current-step', value: nil, queried_at:) + + expect(result.value).to be_nil + end + + it 'rejects unsupported query names and invalid values' do + expect do + described_class.new(query: 'unknown', value: :running, queried_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'unsupported workflow query "unknown"') + expect do + described_class.new(query: 'state', value: 'running', queried_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'workflow query "state" must return a Symbol') + expect do + described_class.new(query: 'state', value: :dead_letter, queried_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'workflow query "state" must return a workflow state') + expect do + described_class.new(query: 'current-steps', value: 'capture_payment', queried_at:) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'workflow query "current-steps" must return an Array') + end + + it 'rejects invalid timestamps and unsupported low-level query values' do + expect do + described_class.new(query: 'state', value: :running, queried_at: '2026-04-26T12:00:00Z') + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'queried_at must be a Time') + + value = described_class.const_get(:Value, false) + expect do + value.new('unsupported', nil).normalize + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'unsupported workflow query "unsupported"') + end +end diff --git a/core/karya/spec/karya/workflow/snapshot_spec.rb b/core/karya/spec/karya/workflow/snapshot_spec.rb index 29dae6e..b5c1dde 100644 --- a/core/karya/spec/karya/workflow/snapshot_spec.rb +++ b/core/karya/spec/karya/workflow/snapshot_spec.rb @@ -40,7 +40,22 @@ def child_workflow(state: :running) ) end - def snapshot(jobs:, step_job_ids: nil, dependencies: {}, rollback: nil, child_workflows: [], child_workflow_ids_by_step_id: {}, parent: nil) + def interaction(kind: :signal, name: :manager_approved, payload: {}) + Karya::Workflow::InteractionSnapshot.new(kind:, name:, payload:, received_at: captured_at + 2) + end + + def snapshot( + jobs:, + step_job_ids: nil, + dependencies: {}, + rollback: nil, + child_workflows: [], + child_workflow_ids_by_step_id: {}, + interactions: [], + interaction_requirements_by_job_id: {}, + interaction_received_at_by_job_id: {}, + parent: nil + ) described_class.new( workflow_id: ' invoice_closeout ', batch_id: ' batch_1 ', @@ -50,6 +65,9 @@ def snapshot(jobs:, step_job_ids: nil, dependencies: {}, rollback: nil, child_wo jobs:, child_workflow_ids_by_step_id:, child_workflows:, + interactions:, + interaction_requirements_by_job_id:, + interaction_received_at_by_job_id:, parent:, rollback: ) @@ -86,12 +104,76 @@ def snapshot(jobs:, step_job_ids: nil, dependencies: {}, rollback: nil, child_wo expect(result.state_for_step(:child)).to eq(:queued) expect(result.rollback_requested?).to be(false) expect(result.rollback).to be_nil + expect(result.interactions).to eq([]) + expect(result.signals).to eq([]) + expect(result.events).to eq([]) expect(result).to be_frozen expect(result.jobs).to be_frozen expect(result.step_states).to be_frozen expect(result.state_counts).to be_frozen end + it 'exposes workflow interaction snapshots and filtered readers' do + signal = interaction(kind: :signal, name: :manager_approved, payload: { 'approved_by' => 'ops' }) + event = interaction(kind: :event, name: :payment_received, payload: { 'source' => 'stripe' }) + + result = snapshot(jobs: [job(id: 'job_root', state: :queued)], interactions: [signal, event]) + + expect(result.interactions).to eq([signal, event]) + expect(result.signals).to eq([signal]) + expect(result.events).to eq([event]) + end + + it 'blocks waiting interaction-gated steps until the matching interaction is delivered' do + jobs = [job(id: 'job_capture', state: :queued)] + blocked = snapshot( + jobs:, + step_job_ids: { capture_payment: 'job_capture' }, + interaction_requirements_by_job_id: { 'job_capture' => { kind: :event, name: :payment_received } } + ) + ready = snapshot( + jobs:, + step_job_ids: { capture_payment: 'job_capture' }, + interaction_requirements_by_job_id: { 'job_capture' => { kind: :event, name: :payment_received } }, + interactions: [interaction(kind: :event, name: :payment_received)] + ) + + expect(blocked.fetch_step(:capture_payment)).to be_blocked + expect(blocked.state).to eq(:blocked) + expect(ready.fetch_step(:capture_payment)).to be_ready + expect(ready.state).to eq(:pending) + end + + it 'accepts explicit interaction delivery timestamps for step readiness separate from inspection history' do + jobs = [job(id: 'job_capture', state: :queued)] + result = snapshot( + jobs:, + step_job_ids: { capture_payment: 'job_capture' }, + interaction_requirements_by_job_id: { 'job_capture' => { kind: :event, name: :payment_received } }, + interaction_received_at_by_job_id: { 'job_capture' => captured_at + 3 }, + interactions: [] + ) + + expect(result.fetch_step(:capture_payment)).to be_ready + expect(result.interactions).to eq([]) + end + + it 'marks every step that shares one interaction requirement as ready once delivered' do + jobs = [job(id: 'job_capture', state: :queued), job(id: 'job_notify', state: :queued)] + result = snapshot( + jobs:, + step_job_ids: { capture_payment: 'job_capture', notify_customer: 'job_notify' }, + interaction_requirements_by_job_id: { + 'job_capture' => { kind: :event, name: :payment_received }, + 'job_notify' => { kind: :event, name: :payment_received } + }, + interactions: [interaction(kind: :event, name: :payment_received)] + ) + + expect(result.fetch_step(:capture_payment)).to be_ready + expect(result.fetch_step(:notify_customer)).to be_ready + end + it 'exposes rollback metadata when requested' do result = snapshot(jobs: [job(id: 'job_root', state: :failed)], rollback:) @@ -191,6 +273,25 @@ def snapshot(jobs:, step_job_ids: nil, dependencies: {}, rollback: nil, child_wo expect do snapshot(jobs:, step_job_ids: { child: 'job_child' }, child_workflows: 'child') end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child_workflows must be an Array') + expect do + snapshot(jobs:, step_job_ids: { child: 'job_child' }, interactions: 'signal') + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interactions must be an Array') + expect do + snapshot(jobs:, step_job_ids: { child: 'job_child' }, interactions: ['signal']) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interactions entries must be Karya::Workflow::InteractionSnapshot') + expect do + snapshot(jobs:, step_job_ids: { child: 'job_child' }, interaction_received_at_by_job_id: 'signal') + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction_received_at_by_job_id must be a Hash') + expect do + snapshot( + jobs:, + step_job_ids: { child: 'job_child' }, + interaction_received_at_by_job_id: { + ' job_child ' => captured_at + 1, + job_child: captured_at + 2 + } + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'duplicate interaction delivery job "job_child"') expect do snapshot(jobs:, step_job_ids: { child: 'job_child' }, child_workflows: ['child']) end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child_workflows entries must be Karya::Workflow::ChildWorkflowSnapshot') @@ -368,6 +469,53 @@ def snapshot(jobs:, step_job_ids: nil, dependencies: {}, rollback: nil, child_wo end.to raise_error(Karya::Workflow::InvalidExecutionError, 'step_job_ids must match jobs in order') end + it 'validates interaction requirement metadata' do + jobs = [job(id: 'job_child', state: :queued)] + + expect do + snapshot(jobs:, step_job_ids: { child: 'job_child' }, interaction_requirements_by_job_id: 'signal') + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction_requirements_by_job_id must be a Hash') + + expect do + snapshot(jobs:, step_job_ids: { child: 'job_child' }, interaction_requirements_by_job_id: { 'job_child' => 'signal' }) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction requirement must be a Hash') + + expect do + snapshot( + jobs:, + step_job_ids: { child: 'job_child' }, + interaction_requirements_by_job_id: { 'job_child' => { kind: :webhook, name: :payment_received } } + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction requirement kind must be :signal or :event') + + expect do + snapshot( + jobs:, + step_job_ids: { child: 'job_child' }, + interaction_requirements_by_job_id: { 'job_child' => { kind: 123, name: :payment_received } } + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction requirement kind must be :signal or :event') + + expect do + snapshot( + jobs:, + step_job_ids: { child: 'job_child' }, + interaction_requirements_by_job_id: { 'job_child' => { kind: :signal, name: ' ' } } + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction_name must be present') + + expect do + snapshot( + jobs:, + step_job_ids: { child: 'job_child' }, + interaction_requirements_by_job_id: { + ' job_child ' => { kind: :signal, name: :manager_approved }, + job_child: { kind: :signal, name: :manager_approved } + } + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'duplicate interaction requirement job "job_child"') + end + it 'derives workflow states' do expect(snapshot(jobs: [job(id: 'job_root', state: :queued)]).state).to eq(:pending) expect(snapshot(jobs: [job(id: 'job_root', state: :reserved)]).state).to eq(:running) diff --git a/core/karya/spec/karya/workflow/step_snapshot_spec.rb b/core/karya/spec/karya/workflow/step_snapshot_spec.rb index 9f8c84c..63d013e 100644 --- a/core/karya/spec/karya/workflow/step_snapshot_spec.rb +++ b/core/karya/spec/karya/workflow/step_snapshot_spec.rb @@ -18,7 +18,13 @@ def job(id: 'job-child', state: :queued) Karya::Job.new(id:, queue: :billing, handler: :sync_billing, state:, created_at:) end - def snapshot(state: :queued, prerequisite_states: { 'job-root' => :succeeded }) + def snapshot( + state: :queued, + prerequisite_states: { 'job-root' => :succeeded }, + interaction_kind: nil, + interaction_name: nil, + interaction_received_at: nil + ) described_class.new( workflow_id: ' invoice_closeout ', batch_id: ' batch_1 ', @@ -26,7 +32,10 @@ def snapshot(state: :queued, prerequisite_states: { 'job-root' => :succeeded }) job_id: ' job-child ', job: job(state:), prerequisite_job_ids: [' job-root '], - prerequisite_states: + prerequisite_states:, + interaction_kind:, + interaction_name:, + interaction_received_at: ) end @@ -70,6 +79,25 @@ def child_workflow(state) expect(snapshot(state: :succeeded)).to be_terminal end + it 'blocks waiting steps until their required interaction arrives' do + blocked = snapshot(interaction_kind: :signal, interaction_name: :manager_approved) + normalized_from_string = snapshot(interaction_kind: 'signal', interaction_name: :manager_approved) + ready = snapshot( + interaction_kind: :event, + interaction_name: :payment_received, + interaction_received_at: created_at + 1 + ) + + expect(blocked).to be_blocked + expect(normalized_from_string.interaction_kind).to eq(:signal) + expect(ready).to be_ready + expect(ready).to have_attributes( + interaction_kind: :event, + interaction_name: 'payment_received', + interaction_received_at: created_at + 1 + ) + end + it 'blocks child workflow steps until the child workflow succeeds' do missing_child = described_class.new( workflow_id: :invoice_closeout, @@ -269,4 +297,34 @@ def child_workflow(state) expect(result.prerequisite_states).to eq('job-root' => :succeeded) expect(missing.prerequisite_states).to eq('job-root' => nil) end + + it 'validates interaction metadata' do + expect do + snapshot(interaction_kind: :webhook, interaction_name: :manager_approved) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction_kind must be :signal or :event') + + expect do + snapshot(interaction_kind: 123, interaction_name: :manager_approved) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction_kind must be :signal or :event') + + expect do + snapshot(interaction_kind: :signal, interaction_name: :manager_approved, interaction_received_at: '2026-04-24T12:00:00Z') + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction_received_at must be a Time') + + expect do + snapshot(interaction_kind: :signal, interaction_name: nil) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction_kind and interaction_name must both be present or both be nil') + + expect do + snapshot(interaction_kind: :signal, interaction_name: ' ') + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction_name must be present') + + expect do + snapshot(interaction_kind: nil, interaction_name: :manager_approved) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction_kind and interaction_name must both be present or both be nil') + + expect do + snapshot(interaction_kind: nil, interaction_name: nil, interaction_received_at: created_at + 1) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'interaction_received_at requires interaction_kind and interaction_name') + end end diff --git a/core/karya/spec/karya/workflow/step_spec.rb b/core/karya/spec/karya/workflow/step_spec.rb index 6562d3d..77d666b 100644 --- a/core/karya/spec/karya/workflow/step_spec.rb +++ b/core/karya/spec/karya/workflow/step_spec.rb @@ -45,6 +45,24 @@ expect(step.child_workflow).to eq('payment_subflow') end + it 'normalizes optional signal and event gates' do + signal_step = described_class.new( + id: :capture_payment, + handler: :capture_payment, + wait_for_signal: ' manager-approved ' + ) + event_step = described_class.new( + id: :capture_payment, + handler: :capture_payment, + wait_for_event: ' payment-received ' + ) + + expect(signal_step.wait_for_signal).to eq('manager-approved') + expect(signal_step.wait_for_event).to be_nil + expect(event_step.wait_for_event).to eq('payment-received') + expect(event_step.wait_for_signal).to be_nil + end + it 'defaults compensation metadata to no-op rollback behavior' do step = described_class.new(id: :capture_payment, handler: :capture_payment) @@ -89,6 +107,20 @@ end.to raise_error(ArgumentError, 'unknown keywords: :unknown, :extra') end + it 'rejects steps that wait for both signal and event' do + expect do + described_class.new( + id: :capture_payment, + handler: :capture_payment, + wait_for_signal: :manager_approved, + wait_for_event: :payment_received + ) + end.to raise_error( + Karya::Workflow::InvalidDefinitionError, + 'workflow step "capture_payment" cannot wait for both signal and event' + ) + end + it 'rejects duplicate dependency ids after normalization' do expect do described_class.new(id: :emit_receipt, handler: :emit_receipt, depends_on: [:calculate_totals, ' calculate_totals ']) diff --git a/docs/pages/workflows/signals.md b/docs/pages/workflows/signals.md index 8467d65..b49e9b6 100644 --- a/docs/pages/workflows/signals.md +++ b/docs/pages/workflows/signals.md @@ -7,8 +7,8 @@ permalink: /workflows/signals/ # Workflow Signals -Karya supports live interaction with running workflows through signals, queries, -and external events. +Karya supports live interaction with running workflows through signals, +queries, external events, and operator checkpoints. ## Covered Behavior