diff --git a/core/karya/lib/karya/queue_store/base.rb b/core/karya/lib/karya/queue_store/base.rb index 0cf4301d..0a46b928 100644 --- a/core/karya/lib/karya/queue_store/base.rb +++ b/core/karya/lib/karya/queue_store/base.rb @@ -90,6 +90,33 @@ def workflow_snapshot(batch_id:, now:) raise NotImplementedError, "#{self.class} must implement ##{__method__}" end + # Enqueue a child workflow batch for one declared parent child step. + def enqueue_child_workflow( + parent_batch_id:, + parent_step_id:, + definition:, + jobs_by_step_id:, + batch_id:, + now:, + compensation_jobs_by_step_id: {} + ) + _parent_batch_id = parent_batch_id + _parent_step_id = parent_step_id + _definition = definition + _jobs_by_step_id = jobs_by_step_id + _batch_id = batch_id + _now = now + _compensation_jobs_by_step_id = compensation_jobs_by_step_id + raise NotImplementedError, "#{self.class} must implement ##{__method__}" + end + + # Synchronize terminal child workflow state into parent child-step jobs. + def sync_child_workflows(parent_batch_id:, now:) + _parent_batch_id = parent_batch_id + _now = now + raise NotImplementedError, "#{self.class} must implement ##{__method__}" + end + # Trigger explicit saga rollback for one failed workflow batch. def rollback_workflow(batch_id:, now:, reason:) _batch_id = batch_id diff --git a/core/karya/lib/karya/queue_store/bulk_mutation_report.rb b/core/karya/lib/karya/queue_store/bulk_mutation_report.rb index 6e677dec..2c6bca82 100644 --- a/core/karya/lib/karya/queue_store/bulk_mutation_report.rb +++ b/core/karya/lib/karya/queue_store/bulk_mutation_report.rb @@ -17,7 +17,9 @@ class BulkMutationReport replay_dead_letter_jobs retry_dead_letter_jobs discard_dead_letter_jobs + enqueue_child_workflow rollback_workflow + sync_child_workflows retry_workflow_steps dead_letter_workflow_steps replay_workflow_steps @@ -128,9 +130,9 @@ def skipped_reason_error_message def action_error_message 'action must be one of :enqueue_many, :retry_jobs, :cancel_jobs, :dead_letter_jobs, ' \ - ':replay_dead_letter_jobs, :retry_dead_letter_jobs, :discard_dead_letter_jobs, :rollback_workflow, ' \ + ':replay_dead_letter_jobs, :retry_dead_letter_jobs, :discard_dead_letter_jobs, :enqueue_child_workflow, :rollback_workflow, ' \ ':retry_workflow_steps, :dead_letter_workflow_steps, :replay_workflow_steps, ' \ - ':retry_dead_letter_workflow_steps, or :discard_workflow_steps' + ':retry_dead_letter_workflow_steps, :discard_workflow_steps, or :sync_child_workflows' end private_constant :ACTIONS, :JobIdList, :JobList, :SKIPPED_JOB_REASONS diff --git a/core/karya/lib/karya/queue_store/in_memory.rb b/core/karya/lib/karya/queue_store/in_memory.rb index f021b39f..1ccb4c5f 100644 --- a/core/karya/lib/karya/queue_store/in_memory.rb +++ b/core/karya/lib/karya/queue_store/in_memory.rb @@ -32,21 +32,21 @@ module QueueStore # Single-process reference implementation for queue submission and reservation behavior. # # InMemory is intentionally ephemeral and suitable for development, tests, - # examples, and as the executable reference for `QueueStore::Base` - # semantics. It is not a durable backend: jobs, queue indexes, reservations, + # examples, and as the executable reference for `QueueStore::Base` semantics. + # It is not a durable backend: jobs, queue indexes, reservations, # active executions, retry state, and expired-token tombstones live only in # process memory and are lost on restart. Production deployments that need # durable enqueue acknowledgment or restart/takeover recovery must use a # shared persistent backend implementing the Base durability contract. class InMemory # Owner-local implementation helpers for the executable reference store. - module Internal - end + module Internal; end include Base include Internal::BatchSupport include Internal::BackpressureSupport include Internal::BackpressureSnapshotSupport + include Internal::ChildWorkflowSupport include Internal::DeadLetterSupport include Internal::ExecutionSupport include Internal::ExpirationSupport @@ -60,8 +60,7 @@ module Internal include Internal::UniquenessSupport include Internal::WorkflowSupport - DEFAULT_EXPIRED_TOMBSTONE_LIMIT = 1024 - DEFAULT_COMPLETED_BATCH_RETENTION_LIMIT = 1024 + DEFAULT_EXPIRED_TOMBSTONE_LIMIT = DEFAULT_COMPLETED_BATCH_RETENTION_LIMIT = 1024 DEFAULT_MAX_BATCH_SIZE = 1000 RESERVE_QUEUES_ERROR_MESSAGE = 'provide exactly one of queue or queues' diff --git a/core/karya/lib/karya/queue_store/in_memory/internal.rb b/core/karya/lib/karya/queue_store/in_memory/internal.rb index f1ee3048..6213d525 100644 --- a/core/karya/lib/karya/queue_store/in_memory/internal.rb +++ b/core/karya/lib/karya/queue_store/in_memory/internal.rb @@ -8,6 +8,8 @@ require_relative 'internal/backpressure_support' require_relative 'internal/backpressure_snapshot_support' require_relative 'internal/batch_support' +require_relative 'internal/child_workflow_support' +require_relative 'internal/workflow_child_state' require_relative 'internal/dead_letter_support' require_relative 'internal/execution_recovery' require_relative 'internal/execution_support' @@ -24,4 +26,5 @@ require_relative 'internal/retry_support' require_relative 'internal/store_state' require_relative 'internal/uniqueness_support' +require_relative 'internal/workflow_child_ids' require_relative 'internal/workflow_support' diff --git a/core/karya/lib/karya/queue_store/in_memory/internal/child_workflow_support.rb b/core/karya/lib/karya/queue_store/in_memory/internal/child_workflow_support.rb new file mode 100644 index 00000000..0f8de062 --- /dev/null +++ b/core/karya/lib/karya/queue_store/in_memory/internal/child_workflow_support.rb @@ -0,0 +1,258 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +module Karya + module QueueStore + class InMemory + module Internal + # Owner-local child workflow enqueue and lifecycle sync support. + module ChildWorkflowSupport + def enqueue_child_workflow( + parent_batch_id:, + parent_step_id:, + definition:, + jobs_by_step_id:, + batch_id:, + now:, + compensation_jobs_by_step_id: {} + ) + request = ChildWorkflowRequest.new( + parent_batch_id: Workflow.send(:normalize_batch_identifier, :parent_batch_id, parent_batch_id), + parent_step_id: Workflow.send(:normalize_execution_identifier, :parent_step_id, parent_step_id), + now: normalize_time(:now, now, error_class: Workflow::InvalidExecutionError) + ) + + @mutex.synchronize do + parent_batch_id = request.parent_batch_id + parent_step_id = request.parent_step_id + parent = prepare_child_workflow_parent(parent_batch_id:, parent_step_id:, definition:) + binding = Workflow.send(:build_compensated_execution_binding, definition:, jobs_by_step_id:, batch_id:, compensation_jobs_by_step_id:) + validate_child_batch_identity(parent_batch_id:, child_batch_id: binding.batch_id) + enqueue_child_workflow_binding(parent:, parent_step_id:, binding:, definition:, now: request.now) + end + end + + def sync_child_workflows(parent_batch_id:, now:) + request = ChildWorkflowSyncRequest.new( + parent_batch_id: Workflow.send(:normalize_batch_identifier, :parent_batch_id, parent_batch_id), + now: normalize_time(:now, now, error_class: Workflow::InvalidExecutionError) + ) + + @mutex.synchronize do + parent_batch_id = request.parent_batch_id + now = request.now + fetch_workflow_registration(fetch_batch(parent_batch_id).id) + relationships = child_relationships_for_parent_batch(parent_batch_id) + Karya::Internal::BulkMutation::ReportBuilder.new( + action: :sync_child_workflows, + job_ids: relationships.map(&:parent_job_id), + now: + ).to_report do |job_id, changed_jobs, skipped_jobs| + sync_child_workflow_job(job_id:, now:, changed_jobs:, skipped_jobs:) + end + end + end + + private + + # Normalized child workflow enqueue request. + class ChildWorkflowRequest + attr_reader :now, :parent_batch_id, :parent_step_id + + def initialize(parent_batch_id:, parent_step_id:, now:) + @now = now + @parent_batch_id = parent_batch_id + @parent_step_id = parent_step_id + end + end + + # Normalized child workflow sync request. + class ChildWorkflowSyncRequest + attr_reader :now, :parent_batch_id + + def initialize(parent_batch_id:, now:) + @now = now + @parent_batch_id = parent_batch_id + end + end + + # Groups the parent-side child workflow step identity. + ParentChildWorkflow = Struct.new(:parent_workflow_id, :parent_batch_id, :parent_job_id) + + # Builds step-to-job metadata in definition order for child enqueues. + class ChildStepJobIds + def initialize(definition:, jobs:) + @definition = definition + @jobs = jobs + end + + def to_h + definition.steps.each_with_object({}).with_index do |(workflow_step, step_jobs), index| + step_jobs[workflow_step.id] = jobs.fetch(index).id + end.freeze + end + + private + + attr_reader :definition, :jobs + end + + private_constant :ChildStepJobIds, + :ChildWorkflowRequest, + :ChildWorkflowSyncRequest, + :ParentChildWorkflow + + def enqueue_child_workflow_binding(parent:, parent_step_id:, binding:, definition:, now:) + jobs = binding.jobs + batch = build_enqueue_batch(batch_id: binding.batch_id, jobs:, now:) + validate_bulk_enqueue_uniqueness(jobs, now) + expire_reservations_locked(now) + queued_jobs = jobs.map { |job| enqueue_validated_job(job, now) } + store_batch(batch) + state.register_workflow_dependencies(binding.dependency_job_ids_by_job_id) + ChildWorkflowMetadata.new(state:, parent:, parent_step_id:, binding:, definition:).register + ChildWorkflowReport.new(binding:, queued_jobs:, now:).to_report + end + + # Registers child workflow metadata after enqueue validation succeeds. + class ChildWorkflowMetadata + def initialize(state:, parent:, parent_step_id:, binding:, definition:) + @state = state + @parent = parent + @parent_step_id = parent_step_id + @binding = binding + @definition = definition + end + + def register + batch_id = binding.batch_id + workflow_id = definition.id + state.register_workflow( + batch_id:, + workflow_id:, + step_job_ids: ChildStepJobIds.new(definition:, jobs: binding.jobs).to_h, + dependency_job_ids_by_job_id: binding.dependency_job_ids_by_job_id, + compensation_jobs_by_step_id: binding.compensation_jobs_by_step_id, + child_workflow_ids_by_step_id: WorkflowChildIds.new(definition).to_h + ) + state.workflow_children.register( + parent_workflow_id: parent.parent_workflow_id, + parent_batch_id: parent.parent_batch_id, + parent_step_id:, + parent_job_id: parent.parent_job_id, + child_workflow_id: workflow_id, + child_batch_id: batch_id + ) + end + + private + + attr_reader :binding, :definition, :parent, :parent_step_id, :state + end + + # Builds the public child workflow enqueue report. + class ChildWorkflowReport + def initialize(binding:, queued_jobs:, now:) + @binding = binding + @queued_jobs = queued_jobs + @now = now + end + + def to_report + BulkMutationReport.new( + action: :enqueue_child_workflow, + performed_at: now, + requested_job_ids: binding.jobs.map(&:id), + changed_jobs: queued_jobs, + skipped_jobs: [] + ) + end + + private + + attr_reader :binding, :now, :queued_jobs + end + + private_constant :ChildWorkflowMetadata, :ChildWorkflowReport + + def prepare_child_workflow_parent(parent_batch_id:, parent_step_id:, definition:) + parent_batch = fetch_batch(parent_batch_id) + batch_id = parent_batch.id + parent_registration = fetch_workflow_registration(batch_id) + expected_child_workflow_id = parent_registration.child_workflow_ids_by_step_id[parent_step_id] + validate_child_workflow_parent_step(parent_step_id, expected_child_workflow_id) + validate_child_workflow_definition(definition, expected_child_workflow_id, parent_step_id) + validate_child_workflow_not_registered(batch_id, parent_step_id) + validate_child_workflow_parent_job(parent_registration, parent_step_id, batch_id) + end + + def validate_child_workflow_parent_step(parent_step_id, expected_child_workflow_id) + return if expected_child_workflow_id + + raise Workflow::InvalidExecutionError, "workflow step #{parent_step_id.inspect} is not a child workflow step" + end + + def validate_child_workflow_definition(definition, expected_child_workflow_id, parent_step_id) + raise Workflow::InvalidExecutionError, 'definition must be a Karya::Workflow::Definition' unless definition.is_a?(Workflow::Definition) + + workflow_id = definition.id + return if expected_child_workflow_id == workflow_id + + raise Workflow::InvalidExecutionError, "child workflow #{workflow_id.inspect} does not match parent step #{parent_step_id.inspect}" + end + + def validate_child_workflow_not_registered(parent_batch_id, parent_step_id) + return unless child_relationship(parent_batch_id, parent_step_id) + + raise Workflow::InvalidExecutionError, "child workflow already registered for step #{parent_step_id.inspect}" + end + + def validate_child_workflow_parent_job(parent_registration, parent_step_id, parent_batch_id) + parent_job_id = parent_registration.step_job_ids.fetch(parent_step_id) + parent_job = state.jobs_by_id.fetch(parent_job_id) + raise Workflow::InvalidExecutionError, "parent child workflow step #{parent_step_id.inspect} must be queued" unless parent_job.state == :queued + + ParentChildWorkflow.new(parent_registration.workflow_id, parent_batch_id, parent_job_id).freeze + end + + def validate_child_batch_identity(parent_batch_id:, child_batch_id:) + return unless parent_batch_id == child_batch_id + + raise Workflow::InvalidExecutionError, 'child workflow batch id must differ from parent batch id' + end + + def child_relationship(parent_batch_id, parent_step_id) + state.workflow_children.for_parent_step(parent_batch_id, parent_step_id) + end + + def child_relationships_for_parent_batch(parent_batch_id) + state.workflow_children.for_parent_batch(parent_batch_id) + end + + def sync_child_workflow_job(job_id:, now:, changed_jobs:, skipped_jobs:) + relationship = state.workflow_children.for_parent_job(job_id) + child_batch_id = relationship.child_batch_id + case child_workflow_state(child_batch_id, now:) + when :failed + dead_letter_requested_job(job_id, now, "child workflow #{child_batch_id} failed", changed_jobs, skipped_jobs) + when :cancelled + cancel_requested_job(job_id, now, changed_jobs, skipped_jobs) + else + parent_job = state.jobs_by_id.fetch(job_id) + skipped_jobs << Karya::Internal::BulkMutation::SkippedJob.new(job_id:, reason: :ineligible_state, state: parent_job.state).to_h + end + end + + def child_workflow_state(child_batch_id, now:) + batch_id = fetch_batch(child_batch_id).id + WorkflowChildState.new(state:, now:).resolve(batch_id) + end + end + end + end + end +end diff --git a/core/karya/lib/karya/queue_store/in_memory/internal/reserve_selection_support.rb b/core/karya/lib/karya/queue_store/in_memory/internal/reserve_selection_support.rb index f8fa64fa..ce3288a3 100644 --- a/core/karya/lib/karya/queue_store/in_memory/internal/reserve_selection_support.rb +++ b/core/karya/lib/karya/queue_store/in_memory/internal/reserve_selection_support.rb @@ -83,7 +83,7 @@ def matching_job_for(queue, handler_matcher, reserve_scan_state, now) def reservable_candidate?(queued_job, handler_matcher, reserve_scan_state, now) return false unless handler_matcher.include?(queued_job.handler) - return false unless workflow_dependencies_satisfied?(queued_job) + return false unless workflow_dependencies_satisfied?(queued_job, now:) return false if reserve_scan_state.concurrency_blocked?(queued_job) return false if reserve_scan_state.rate_limited?(queued_job) return false if circuit_breaker_blocked?(queued_job, now) diff --git a/core/karya/lib/karya/queue_store/in_memory/internal/store_state.rb b/core/karya/lib/karya/queue_store/in_memory/internal/store_state.rb index 7bd99b6a..f17657b9 100644 --- a/core/karya/lib/karya/queue_store/in_memory/internal/store_state.rb +++ b/core/karya/lib/karya/queue_store/in_memory/internal/store_state.rb @@ -32,25 +32,277 @@ class StoreState :reservation_tokens_in_order, :reservations_by_token, :stuck_job_recoveries_by_id, + :workflow_children, :workflow_dependency_job_ids_by_job_id, :workflow_rollback_batch_ids, :workflow_registrations_by_batch_id, :workflow_rollbacks_by_batch_id # Immutable owner-local workflow registration metadata for one batch. - WorkflowRegistration = Struct.new(:workflow_id, :step_job_ids, :dependency_job_ids_by_job_id, :compensation_jobs_by_step_id) + WorkflowRegistration = Struct.new( + :workflow_id, + :step_job_ids, + :dependency_job_ids_by_job_id, + :compensation_jobs_by_step_id, + :child_workflow_ids_by_step_id + ) # Immutable owner-local rollback metadata for one workflow batch. WorkflowRollback = Struct.new(:batch_id, :rollback_batch_id, :reason, :requested_at, :compensation_job_ids) + # Owner-local child workflow relationship registry. + class WorkflowChildren + # Immutable owner-local child workflow relationship metadata. + Relationship = Struct.new( + :parent_workflow_id, + :parent_batch_id, + :parent_step_id, + :parent_job_id, + :child_workflow_id, + :child_batch_id + ) + private_constant :Relationship + + def initialize + @by_child_batch_id = {} + @by_parent_batch_id = {} + @by_parent_job_id = {} + @expected_child_workflow_id_by_job_id = {} + end + + attr_reader :expected_child_workflow_id_by_job_id + + def register_expected_child(parent_job_id, child_workflow_id) + expected_child_workflow_id_by_job_id[parent_job_id] = child_workflow_id + end + + def register(parent_workflow_id:, parent_batch_id:, parent_step_id:, parent_job_id:, child_workflow_id:, child_batch_id:) + relationship = Relationship.new( + parent_workflow_id, + parent_batch_id, + parent_step_id, + parent_job_id, + child_workflow_id, + child_batch_id + ).freeze + ensure_parent_relationships(parent_batch_id)[parent_step_id] = relationship + @by_parent_job_id[parent_job_id] = relationship + @by_child_batch_id[child_batch_id] = relationship + relationship + end + + def for_parent_step(parent_batch_id, parent_step_id) + parent_relationships(parent_batch_id)[parent_step_id] + end + + def for_parent_batch(parent_batch_id) + parent_relationships(parent_batch_id).values + end + + def for_parent_job(parent_job_id) + @by_parent_job_id[parent_job_id] + end + + def for_child_batch(child_batch_id) + @by_child_batch_id[child_batch_id] + end + + def delete_by_parent_batch(parent_batch_id) + relationships = @by_parent_batch_id.delete(parent_batch_id) + return [] unless relationships + + relationships.each_value.map do |relationship| + delete_relationship(relationship, remove_parent_batch: false) + end + end + + def delete_by_child_batch(child_batch_id) + relationship = @by_child_batch_id[child_batch_id] + return unless relationship + + delete_relationship(relationship) + end + + def delete_expected_children(parent_job_ids) + parent_job_ids.each { |parent_job_id| @expected_child_workflow_id_by_job_id.delete(parent_job_id) } + end + + private + + def parent_relationships(parent_batch_id) + @by_parent_batch_id[parent_batch_id] || {} + end + + def ensure_parent_relationships(parent_batch_id) + @by_parent_batch_id[parent_batch_id] ||= {} + end + + def delete_relationship(relationship, remove_parent_batch: true) + child_batch_id = relationship.child_batch_id + parent_job_id = relationship.parent_job_id + parent_batch_id = relationship.parent_batch_id + parent_step_id = relationship.parent_step_id + + @by_child_batch_id.delete(child_batch_id) + @by_parent_job_id.delete(parent_job_id) + @expected_child_workflow_id_by_job_id.delete(parent_job_id) + return relationship unless remove_parent_batch + + relationships = @by_parent_batch_id[parent_batch_id] + return relationship unless relationships + + relationships.delete(parent_step_id) + @by_parent_batch_id.delete(parent_batch_id) if relationships.empty? + relationship + end + end + + # Decides whether a terminal child batch must remain because its parent is still active. + class ChildBatchRetention + def initialize(batches_by_id:, workflow_children:, terminal_batch:) + @batches_by_id = batches_by_id + @workflow_children = workflow_children + @terminal_batch = terminal_batch + end + + def retain?(batch_id) + relationship = workflow_children.for_child_batch(batch_id) + return false unless relationship + + parent_batch = batches_by_id[relationship.parent_batch_id] + parent_batch && !terminal_batch.call(parent_batch) + end + + private + + attr_reader :batches_by_id, :terminal_batch, :workflow_children + end + + # Prunes terminal batches while respecting active parent-child relationships. + class TerminalBatchPruner + def initialize(batch_indexes:, workflow_indexes:) + @batch_indexes = batch_indexes + @workflow_indexes = workflow_indexes + end + + def call(retention_limit:, child_batch_retention:) + pruned_batch_ids = [] + inspected_batch_count = 0 + + loop do + terminal_batch_count = terminal_batch_ids_in_order.length + break unless terminal_batch_count > retention_limit && inspected_batch_count < terminal_batch_count + + batch_id = terminal_batch_ids_in_order.shift + if child_batch_retention.retain?(batch_id) + terminal_batch_ids_in_order << batch_id + inspected_batch_count += 1 + next + end + + inspected_batch_count = 0 + terminal_batch_ids_index.delete(batch_id) + batch = batches_by_id.delete(batch_id) + if batch + cleanup_batch(batch_id:, batch:) + pruned_batch_ids << batch_id + else + cleanup_batch(batch_id:, batch: nil) + end + end + + pruned_batch_ids + end + + private + + attr_reader :batch_indexes, :workflow_indexes + + def batch_id_by_job_id + batch_indexes.fetch(:batch_id_by_job_id) + end + + def batches_by_id + batch_indexes.fetch(:batches_by_id) + end + + def terminal_batch_ids_in_order + batch_indexes.fetch(:terminal_batch_ids_in_order) + end + + def terminal_batch_ids_index + batch_indexes.fetch(:terminal_batch_ids_index) + end + + def workflow_children + workflow_indexes.fetch(:workflow_children) + end + + def workflow_dependency_job_ids_by_job_id + workflow_indexes.fetch(:workflow_dependency_job_ids_by_job_id) + end + + def workflow_registrations_by_batch_id + workflow_indexes.fetch(:workflow_registrations_by_batch_id) + end + + def workflow_rollback_batch_ids + workflow_indexes.fetch(:workflow_rollback_batch_ids) + end + + def workflow_rollbacks_by_batch_id + workflow_indexes.fetch(:workflow_rollbacks_by_batch_id) + end + + def cleanup_batch(batch_id:, batch:) + PrunedBatchCleanup.call( + batch_id:, + batch:, + job_indexes: { + batch_id_by_job_id:, + workflow_dependency_job_ids_by_job_id: + }, + workflow_indexes: { + workflow_children:, + workflow_rollback_batch_ids:, + workflow_registrations_by_batch_id:, + workflow_rollbacks_by_batch_id: + } + ) + end + end + # Workflow registration writers kept separate from generic store state. module WorkflowMetadata - def register_workflow(batch_id:, workflow_id:, step_job_ids:, dependency_job_ids_by_job_id:, compensation_jobs_by_step_id:) - workflow_registrations_by_batch_id[batch_id] = WorkflowRegistration.new( + def register_workflow( + batch_id:, + workflow_id:, + step_job_ids:, + dependency_job_ids_by_job_id:, + compensation_jobs_by_step_id:, + child_workflow_ids_by_step_id: {} + ) + registration = WorkflowRegistration.new( workflow_id, step_job_ids.dup.freeze, dependency_job_ids_by_job_id.transform_values { |dependency_job_ids| dependency_job_ids.dup.freeze }.freeze, - compensation_jobs_by_step_id.dup.freeze + compensation_jobs_by_step_id.dup.freeze, + child_workflow_ids_by_step_id.dup.freeze ).freeze + workflow_registrations_by_batch_id[batch_id] = registration + child_workflow_ids_by_step_id.each do |step_id, child_workflow_id| + workflow_children.register_expected_child(step_job_ids.fetch(step_id), child_workflow_id) + end + registration + end + + def register_workflow_dependencies(dependency_job_ids_by_job_id) + workflow_dependency_job_ids_by_job_id.merge!( + dependency_job_ids_by_job_id.transform_values { |dependency_job_ids| dependency_job_ids.dup.freeze } + ) + end + + def workflow_dependency_job_ids_for(job_id) + workflow_dependency_job_ids_by_job_id[job_id] end def register_workflow_rollback(batch_id:, rollback_batch_id:, reason:, requested_at:, compensation_job_ids:) @@ -67,7 +319,12 @@ def register_workflow_rollback(batch_id:, rollback_batch_id:, reason:, requested include WorkflowMetadata - private_constant :WorkflowMetadata, :WorkflowRegistration, :WorkflowRollback + private_constant :ChildBatchRetention, + :TerminalBatchPruner, + :WorkflowChildren, + :WorkflowMetadata, + :WorkflowRegistration, + :WorkflowRollback def initialize(expired_tombstone_limit:) @batches_by_id = {} @@ -94,6 +351,7 @@ def initialize(expired_tombstone_limit:) @stuck_job_recoveries_by_id = {} @terminal_batch_ids_index = {} @terminal_batch_ids_in_order = [] + @workflow_children = WorkflowChildren.new @workflow_dependency_job_ids_by_job_id = {} @workflow_rollback_batch_ids = {} @workflow_registrations_by_batch_id = {} @@ -243,65 +501,49 @@ def prune_terminal_batches(retention_limit, changed_job: nil) batch_id = @batch_id_by_job_id[changed_job.id] return [] unless batch_id - batch = batches_by_id[batch_id] - if batch - batch_terminal = terminal_batch?(batch) - batch_tracked = @terminal_batch_ids_index[batch_id] - case [batch_terminal, batch_tracked] - when [true, false], [true, nil] - @terminal_batch_ids_index[batch_id] = true - @terminal_batch_ids_in_order << batch_id - when [false, true] - @terminal_batch_ids_index[batch_id] = false - @terminal_batch_ids_in_order.delete(batch_id) - end - end + track_terminal_batch(batch_id) end - pruned_batch_ids = [] - - while @terminal_batch_ids_in_order.length > retention_limit - batch_id = @terminal_batch_ids_in_order.shift - @terminal_batch_ids_index.delete(batch_id) - batch = batches_by_id.delete(batch_id) - unless batch - PrunedBatchCleanup.call( - batch_id:, - batch: nil, - job_indexes: { - batch_id_by_job_id: @batch_id_by_job_id, - workflow_dependency_job_ids_by_job_id: - }, - workflow_indexes: { - workflow_rollback_batch_ids:, - workflow_registrations_by_batch_id:, - workflow_rollbacks_by_batch_id: - } - ) - next - end - - PrunedBatchCleanup.call( - batch_id:, - batch:, - job_indexes: { - batch_id_by_job_id: @batch_id_by_job_id, - workflow_dependency_job_ids_by_job_id: - }, - workflow_indexes: { - workflow_rollback_batch_ids:, - workflow_registrations_by_batch_id:, - workflow_rollbacks_by_batch_id: - } - ) - pruned_batch_ids << batch_id - end - - pruned_batch_ids + child_batch_retention = ChildBatchRetention.new( + batches_by_id:, + workflow_children:, + terminal_batch: method(:terminal_batch?) + ) + TerminalBatchPruner.new( + batch_indexes: { + terminal_batch_ids_in_order: @terminal_batch_ids_in_order, + terminal_batch_ids_index: @terminal_batch_ids_index, + batches_by_id:, + batch_id_by_job_id: @batch_id_by_job_id + }, + workflow_indexes: { + workflow_dependency_job_ids_by_job_id:, + workflow_children:, + workflow_rollback_batch_ids:, + workflow_registrations_by_batch_id:, + workflow_rollbacks_by_batch_id: + } + ).call(retention_limit:, child_batch_retention:) end private + def track_terminal_batch(batch_id) + batch = batches_by_id[batch_id] + return unless batch + + batch_terminal = terminal_batch?(batch) + batch_tracked = @terminal_batch_ids_index[batch_id] + case [batch_terminal, batch_tracked] + when [true, false], [true, nil] + @terminal_batch_ids_index[batch_id] = true + @terminal_batch_ids_in_order << batch_id + when [false, true] + @terminal_batch_ids_index[batch_id] = false + @terminal_batch_ids_in_order.delete(batch_id) + end + end + def terminal_batch?(batch) batch.job_ids.all? do |job_id| job = jobs_by_id[job_id] @@ -364,10 +606,44 @@ def cleanup_stale_batch_membership def cleanup_workflow_registration registration = workflow_registrations_by_batch_id.delete(batch_id) rollback = workflow_rollbacks_by_batch_id.delete(batch_id) + cleanup_child_workflows(registration) workflow_rollback_batch_ids.delete(rollback.rollback_batch_id) if rollback registration end + def cleanup_child_workflows(registration) + cleanup_expected_children(registration) + workflow_children.delete_by_parent_batch(batch_id) + workflow_children.delete_by_child_batch(batch_id) + end + + def cleanup_expected_children(registration) + ExpectedChildrenCleanup.new(registration, workflow_children).call + end + + # Deletes declared child markers for a registration even when no child relationship exists. + class ExpectedChildrenCleanup + def initialize(registration, workflow_children) + @registration = registration + @workflow_children = workflow_children + end + + def call + return unless registration + + workflow_children.delete_expected_children( + registration.child_workflow_ids_by_step_id.keys.map do |step_id| + registration.step_job_ids.fetch(step_id) + end + ) + end + + private + + attr_reader :registration, :workflow_children + end + private_constant :ExpectedChildrenCleanup + def batch_id_by_job_id job_indexes.fetch(:batch_id_by_job_id) end @@ -380,6 +656,10 @@ def workflow_rollback_batch_ids workflow_indexes.fetch(:workflow_rollback_batch_ids) end + def workflow_children + workflow_indexes.fetch(:workflow_children) + end + def workflow_registrations_by_batch_id workflow_indexes.fetch(:workflow_registrations_by_batch_id) end diff --git a/core/karya/lib/karya/queue_store/in_memory/internal/workflow_child_ids.rb b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_child_ids.rb new file mode 100644 index 00000000..9573d123 --- /dev/null +++ b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_child_ids.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +module Karya + module QueueStore + class InMemory + module Internal + # Builds step-to-child-workflow metadata in definition order. + class WorkflowChildIds + def initialize(definition) + @definition = definition + end + + def to_h + definition.steps.each_with_object({}) do |workflow_step, child_workflow_ids| + StepEntry.new(workflow_step).store_in(child_workflow_ids) + end.freeze + end + + private + + attr_reader :definition + + # Adds one declared child workflow id to an accumulator. + class StepEntry + def initialize(workflow_step) + @workflow_step = workflow_step + end + + def store_in(child_workflow_ids) + child_workflow_ids[id] = child_workflow if workflow_step.child_workflow? + end + + private + + attr_reader :workflow_step + + def id + workflow_step.id + end + + def child_workflow + workflow_step.child_workflow + end + end + + private_constant :StepEntry + end + end + end + end +end diff --git a/core/karya/lib/karya/queue_store/in_memory/internal/workflow_child_state.rb b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_child_state.rb new file mode 100644 index 00000000..163e4cad --- /dev/null +++ b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_child_state.rb @@ -0,0 +1,125 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +module Karya + module QueueStore + class InMemory + module Internal + # Builds nested child workflow snapshots and resolves workflow state for one batch. + class WorkflowChildState + def initialize(state:, now:, cache: {}, visiting: {}) + @store_state = state + @now = now + @cache = cache + @visiting = visiting + end + + def resolve(batch_id) + return cache.fetch(batch_id) if cache.key?(batch_id) + raise Workflow::InvalidExecutionError, "child workflow cycle detected at batch #{batch_id.inspect}" if visiting.key?(batch_id) + + added_to_visiting = false + visiting[batch_id] = true + added_to_visiting = true + cache[batch_id] = StateSnapshot.new(batch_id:, state: store_state, now:, cache:, visiting:).state + ensure + visiting.delete(batch_id) if added_to_visiting + end + + # Recursively builds one workflow snapshot state using registered child relationships. + class StateSnapshot + def initialize(batch_id:, state:, now:, cache:, visiting:) + @batch_id = batch_id + @store_state = state + @now = now + @cache = cache + @visiting = visiting + end + + def state + Workflow::Snapshot.new( + workflow_id: registration.workflow_id, + batch_id:, + captured_at: now, + step_job_ids: registration.step_job_ids, + dependency_job_ids_by_job_id: registration.dependency_job_ids_by_job_id, + jobs:, + child_workflow_ids_by_step_id: registration.child_workflow_ids_by_step_id, + child_workflows: + ).state + end + + private + + attr_reader :batch_id, :cache, :now, :store_state, :visiting + + def batch + @batch ||= store_state.batches_by_id.fetch(batch_id) + end + + def registration + @registration ||= store_state.workflow_registrations_by_batch_id.fetch(batch_id) + end + + def jobs + @jobs ||= batch.job_ids.map { |job_id| store_state.jobs_by_id.fetch(job_id) } + end + + def child_workflows + store_state.workflow_children.for_parent_batch(batch_id).map do |relationship| + RelationshipSnapshot.new(relationship:, store_state:, now:, cache:, visiting:).to_snapshot + end.freeze + end + + # Builds one nested child workflow snapshot from stored relationship metadata. + class RelationshipSnapshot + def initialize(relationship:, store_state:, now:, cache:, visiting:) + @relationship = relationship + @store_state = store_state + @now = now + @cache = cache + @visiting = visiting + end + + def to_snapshot + Workflow::ChildWorkflowSnapshot.new( + parent_workflow_id: relationship.parent_workflow_id, + parent_batch_id: relationship.parent_batch_id, + parent_step_id: relationship.parent_step_id, + parent_job_id: relationship.parent_job_id, + child_workflow_id: relationship.child_workflow_id, + child_batch_id: child_batch_id, + child_state: child_state + ) + end + + private + + attr_reader :cache, :now, :relationship, :store_state, :visiting + + def child_batch_id + relationship.child_batch_id + end + + def child_state + WorkflowChildState.new(state: store_state, now:, cache:, visiting:).resolve(child_batch_id) + end + end + + private_constant :RelationshipSnapshot + end + + private_constant :StateSnapshot + + private + + attr_reader :cache, :now, :store_state, :visiting + end + end + end + end +end diff --git a/core/karya/lib/karya/queue_store/in_memory/internal/workflow_support.rb b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_support.rb index ff7ed1e3..08de5c9c 100644 --- a/core/karya/lib/karya/queue_store/in_memory/internal/workflow_support.rb +++ b/core/karya/lib/karya/queue_store/in_memory/internal/workflow_support.rb @@ -30,15 +30,14 @@ def enqueue_workflow(definition:, jobs_by_step_id:, batch_id:, now:, compensatio queued_jobs = jobs.map { |job| enqueue_validated_job(job, normalized_now) } dependency_job_ids_by_job_id = binding.dependency_job_ids_by_job_id store_batch(batch) - state.workflow_dependency_job_ids_by_job_id.merge!( - dependency_job_ids_by_job_id.transform_values { |dependency_job_ids| dependency_job_ids.dup.freeze } - ) + state.register_workflow_dependencies(dependency_job_ids_by_job_id) state.register_workflow( batch_id: workflow_batch_id, workflow_id: definition.id, step_job_ids: StepJobIds.new(definition:, jobs:).to_h, dependency_job_ids_by_job_id:, - compensation_jobs_by_step_id: binding.compensation_jobs_by_step_id + compensation_jobs_by_step_id: binding.compensation_jobs_by_step_id, + child_workflow_ids_by_step_id: WorkflowChildIds.new(definition).to_h ) BulkMutationReport.new( action: :enqueue_many, @@ -65,9 +64,7 @@ def rollback_workflow(batch_id:, now:, reason:) queued_jobs = rollback_jobs.map { |job| enqueue_validated_job(job, normalized_now) } queued_job_ids = queued_jobs.map(&:id) store_batch(rollback_batch) if rollback_batch - state.workflow_dependency_job_ids_by_job_id.merge!( - rollback_plan.dependency_job_ids_by_job_id.transform_values { |dependency_job_ids| dependency_job_ids.dup.freeze } - ) + state.register_workflow_dependencies(rollback_plan.dependency_job_ids_by_job_id) state.register_workflow_rollback( batch_id: rollback.workflow_batch_id, rollback_batch_id: rollback.rollback_batch_id, @@ -426,6 +423,9 @@ def to_snapshot step_job_ids: registration.step_job_ids, dependency_job_ids_by_job_id: registration.dependency_job_ids_by_job_id, jobs:, + child_workflow_ids_by_step_id: registration.child_workflow_ids_by_step_id, + child_workflows: child_workflow_snapshots, + parent: parent_snapshot, rollback: rollback_snapshot ) end @@ -440,6 +440,51 @@ def rollback_snapshot RollbackSnapshotAttributes.new(rollback.to_h).to_snapshot end + + def child_workflow_snapshots + state.workflow_children.for_parent_batch(batch.id).map do |relationship| + ChildWorkflowSnapshotBuilder.new(relationship:, resolver: child_state_resolver).to_snapshot + end.freeze + end + + def parent_snapshot + relationship = state.workflow_children.for_child_batch(batch.id) + return unless relationship + + ChildWorkflowSnapshotBuilder.new(relationship:, resolver: child_state_resolver).to_snapshot + end + + def child_state_resolver + @child_state_resolver ||= WorkflowChildState.new(state:, now:) + end + end + + # Builds public child workflow relationship snapshots from store metadata. + class ChildWorkflowSnapshotBuilder + def initialize(relationship:, resolver:) + @relationship = relationship + @resolver = resolver + end + + def to_snapshot + Workflow::ChildWorkflowSnapshot.new( + parent_workflow_id: relationship.parent_workflow_id, + parent_batch_id: relationship.parent_batch_id, + parent_step_id: relationship.parent_step_id, + parent_job_id: relationship.parent_job_id, + child_workflow_id: relationship.child_workflow_id, + child_batch_id: relationship.child_batch_id, + child_state: + ) + end + + private + + attr_reader :relationship, :resolver + + def child_state + resolver.resolve(relationship.child_batch_id) + end end # Converts owner-local rollback storage into public workflow inspection. @@ -518,10 +563,11 @@ def validation_error_message "workflow batch #{batch_id} has active jobs and cannot be rolled back" end end - private_constant :RollbackSnapshotAttributes, :RollbackState, :WorkflowSnapshotBuilder + private_constant :ChildWorkflowSnapshotBuilder, :RollbackSnapshotAttributes, :RollbackState, :WorkflowSnapshotBuilder - def workflow_dependencies_satisfied?(job) - prerequisite_job_ids = state.workflow_dependency_job_ids_by_job_id[job.id] + def workflow_dependencies_satisfied?(job, now:) + prerequisite_job_ids = state.workflow_dependency_job_ids_for(job.id) + return false unless workflow_child_satisfied?(job, now:) return true unless prerequisite_job_ids prerequisite_job_ids.all? do |prerequisite_job_id| @@ -529,6 +575,18 @@ def workflow_dependencies_satisfied?(job) prerequisite_job && prerequisite_job.state == :succeeded end end + + def workflow_child_satisfied?(job, now:) + job_id = job.id + workflow_children = state.workflow_children + child_workflow_id = workflow_children.expected_child_workflow_id_by_job_id[job_id] + return true unless child_workflow_id + + relationship = workflow_children.for_parent_job(job_id) + return false unless relationship + + child_workflow_state(relationship.child_batch_id, now:) == :succeeded + end end end end diff --git a/core/karya/lib/karya/workflow.rb b/core/karya/lib/karya/workflow.rb index 7e0fc64d..4b9acbed 100644 --- a/core/karya/lib/karya/workflow.rb +++ b/core/karya/lib/karya/workflow.rb @@ -9,6 +9,7 @@ require_relative 'workflow/batch' require_relative 'workflow/batch_snapshot' require_relative 'workflow/catalog' +require_relative 'workflow/child_workflow_snapshot' require_relative 'workflow/dependency' require_relative 'workflow/definition' require_relative 'workflow/execution_binding' @@ -75,8 +76,8 @@ def initialize(id) @steps = [] end - def step(id, handler:, arguments: {}, depends_on: nil, compensate_with: nil, compensation_arguments: {}) - steps << Step.new(id:, handler:, arguments:, depends_on:, compensate_with:, compensation_arguments:) + def step(id, handler:, arguments: {}, depends_on: nil, compensate_with: nil, compensation_arguments: {}, child_workflow: nil) + steps << Step.new(id:, handler:, arguments:, depends_on:, compensate_with:, compensation_arguments:, child_workflow:) nil end diff --git a/core/karya/lib/karya/workflow/child_workflow_snapshot.rb b/core/karya/lib/karya/workflow/child_workflow_snapshot.rb new file mode 100644 index 00000000..eba4b575 --- /dev/null +++ b/core/karya/lib/karya/workflow/child_workflow_snapshot.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +module Karya + module Workflow + # Immutable inspection view of one parent-child workflow relationship. + class ChildWorkflowSnapshot + WORKFLOW_STATES = %i[pending running blocked succeeded failed cancelled].freeze + + attr_reader :child_batch_id, + :child_state, + :child_workflow_id, + :parent_batch_id, + :parent_job_id, + :parent_step_id, + :parent_workflow_id + + def initialize(**attributes) + attributes = Attributes.new(attributes) + @parent_workflow_id = attributes.parent_workflow_id + @parent_batch_id = attributes.parent_batch_id + @parent_step_id = attributes.parent_step_id + @parent_job_id = attributes.parent_job_id + @child_workflow_id = attributes.child_workflow_id + @child_batch_id = attributes.child_batch_id + @child_state = attributes.child_state + freeze + end + + # Validates and exposes child workflow relationship attributes. + class Attributes + REQUIRED_ATTRIBUTES = %i[ + parent_workflow_id + parent_batch_id + parent_step_id + parent_job_id + child_workflow_id + child_batch_id + child_state + ].freeze + + def initialize(attributes) + @attributes = attributes + validate_keys + end + + def parent_workflow_id + Workflow.send(:normalize_identifier, :parent_workflow_id, fetch(:parent_workflow_id)) + end + + def parent_batch_id + Workflow.send(:normalize_batch_identifier, :parent_batch_id, fetch(:parent_batch_id)) + end + + def parent_step_id + Workflow.send(:normalize_execution_identifier, :parent_step_id, fetch(:parent_step_id)) + end + + def parent_job_id + Workflow.send(:normalize_execution_identifier, :parent_job_id, fetch(:parent_job_id)) + end + + def child_workflow_id + Workflow.send(:normalize_identifier, :child_workflow_id, fetch(:child_workflow_id)) + end + + def child_batch_id + Workflow.send(:normalize_batch_identifier, :child_batch_id, fetch(:child_batch_id)) + end + + def child_state + state = fetch(:child_state) + return state if WORKFLOW_STATES.include?(state) + + raise InvalidExecutionError, 'child_state must be a workflow state' + end + + private + + attr_reader :attributes + + def fetch(name) + attributes.fetch(name) { raise ArgumentError, "missing keyword: :#{name}" } + end + + def validate_keys + unknown_keys = attributes.keys - REQUIRED_ATTRIBUTES + return if unknown_keys.empty? + + raise ArgumentError, "unknown keyword: :#{unknown_keys.first}" + end + end + + private_constant :Attributes, :WORKFLOW_STATES + end + end +end diff --git a/core/karya/lib/karya/workflow/definition.rb b/core/karya/lib/karya/workflow/definition.rb index 1502f265..dbbd69b6 100644 --- a/core/karya/lib/karya/workflow/definition.rb +++ b/core/karya/lib/karya/workflow/definition.rb @@ -9,12 +9,8 @@ module Karya module Workflow # Immutable normalized workflow definition built from ordered steps. class Definition - attr_reader :compensable_step_ids, - :dependencies, + attr_reader :dependencies, :id, - :leaf_step_ids, - :root_step_ids, - :step_ids, :steps def initialize(id:, steps:) @@ -24,17 +20,21 @@ def initialize(id:, steps:) graph = Graph.new(steps) @steps = graph.steps @steps_by_id = @steps.to_h { |workflow_step| [workflow_step.id, workflow_step] }.freeze - inspection = graph.inspection - @step_ids = inspection.step_ids + @inspection = graph.inspection @dependencies = graph.dependencies - @dependencies_by_step_id = inspection.dependencies_by_step_id - @dependents_by_step_id = inspection.dependents_by_step_id - @root_step_ids = inspection.root_step_ids - @leaf_step_ids = inspection.leaf_step_ids - @compensable_step_ids = inspection.compensable_step_ids freeze end + def step_ids = inspection.step_ids + + def root_step_ids = inspection.root_step_ids + + def leaf_step_ids = inspection.leaf_step_ids + + def compensable_step_ids = inspection.compensable_step_ids + + def child_step_ids = inspection.child_step_ids + def step(step_id) normalized_step_id = Workflow.send(:normalize_identifier, :step_id, step_id) steps_by_id[normalized_step_id] @@ -49,17 +49,17 @@ def fetch_step(step_id) def dependencies_for(step_id) workflow_step = fetch_step(step_id) - dependencies_by_step_id.fetch(workflow_step.id) + inspection.dependencies_by_step_id.fetch(workflow_step.id) end def dependents_for(step_id) workflow_step = fetch_step(step_id) - dependents_by_step_id.fetch(workflow_step.id) + inspection.dependents_by_step_id.fetch(workflow_step.id) end private - attr_reader :dependencies_by_step_id, :dependents_by_step_id, :steps_by_id + attr_reader :inspection, :steps_by_id # Owner-local graph normalizer and validator for workflow step composition. class Graph @@ -175,6 +175,7 @@ def to_a # Builds definition inspection indexes from normalized ordered steps. class Inspection attr_reader :compensable_step_ids, + :child_step_ids, :dependencies_by_step_id, :dependents_by_step_id, :leaf_step_ids, @@ -189,6 +190,7 @@ def initialize(steps) @root_step_ids = StepFilter.new(steps).root_ids @leaf_step_ids = StepFilter.new(steps).leaf_ids(@dependents_by_step_id) @compensable_step_ids = StepFilter.new(steps).compensable_ids + @child_step_ids = StepFilter.new(steps).child_ids freeze end @@ -254,6 +256,10 @@ def compensable_ids steps.filter_map { |workflow_step| StepEntry.new(workflow_step).compensable_id }.freeze end + def child_ids + steps.filter_map { |workflow_step| StepEntry.new(workflow_step).child_id }.freeze + end + private attr_reader :steps @@ -300,6 +306,10 @@ def compensable_id id if workflow_step.compensable? end + def child_id + id if workflow_step.child_workflow? + end + private attr_reader :workflow_step diff --git a/core/karya/lib/karya/workflow/snapshot.rb b/core/karya/lib/karya/workflow/snapshot.rb index 5a51d5ee..0eafe4d8 100644 --- a/core/karya/lib/karya/workflow/snapshot.rb +++ b/core/karya/lib/karya/workflow/snapshot.rb @@ -20,16 +20,18 @@ class Snapshot dependency_job_ids_by_job_id jobs ].freeze - OPTIONAL_ATTRIBUTES = %i[rollback].freeze + OPTIONAL_ATTRIBUTES = %i[child_workflow_ids_by_step_id child_workflows parent rollback].freeze SUPPORTED_ATTRIBUTES = (REQUIRED_ATTRIBUTES + OPTIONAL_ATTRIBUTES).freeze def initialize(**attributes) attributes = Attributes.new(attributes) @identity = attributes.identity @membership = attributes.membership - @step_inspection = StepInspection.new(identity:, membership:) + @child_relationships = attributes.child_relationships + @step_inspection = StepInspection.new(identity:, membership:, child_relationships:) + @parent = attributes.parent @rollback = attributes.rollback - @summary_data = SummaryData.new(membership) + @summary_data = SummaryData.new(membership, child_relationships) freeze end @@ -85,6 +87,18 @@ def rollback_requested? !!rollback end + def child_workflows + child_relationships.child_workflows + end + + def child_workflow(step_id) + child_relationships.child_workflow(step_id) + end + + def fetch_child_workflow(step_id) + child_relationships.fetch_child_workflow(step_id) + end + def state_counts summary_data.state_counts end @@ -105,7 +119,7 @@ def state summary_data.state end - attr_reader :rollback + attr_reader :parent, :rollback # Validates and exposes snapshot construction attributes. class Attributes @@ -141,6 +155,20 @@ def rollback value end + def child_relationships + ChildRelationships.new( + child_workflow_ids_by_step_id: ChildWorkflowIds.new(attributes.fetch(:child_workflow_ids_by_step_id, {})).to_h, + child_workflows: ChildWorkflowList.new(attributes.fetch(:child_workflows, [])).to_a + ) + end + + def parent + value = attributes.fetch(:parent, nil) + raise InvalidExecutionError, 'parent must be Karya::Workflow::ChildWorkflowSnapshot' if value && !value.is_a?(ChildWorkflowSnapshot) + + value + end + private attr_reader :attributes @@ -153,6 +181,55 @@ def validate_keys end end + # Groups normalized parent and child workflow relationship metadata. + class ChildRelationships + attr_reader :child_workflow_ids_by_step_id, :child_workflows, :child_workflows_by_step_id + + def initialize(child_workflow_ids_by_step_id:, child_workflows:) + @child_workflow_ids_by_step_id = child_workflow_ids_by_step_id + @child_workflows = child_workflows + validate_relationships + @child_workflows_by_step_id = child_workflows.to_h { |child_workflow| [child_workflow.parent_step_id, child_workflow] }.freeze + freeze + end + + def child_workflow_id(step_id) + normalized_step_id = Workflow.send(:normalize_execution_identifier, :step_id, step_id) + child_workflow_ids_by_step_id[normalized_step_id] + end + + def child_workflow(step_id) + normalized_step_id = Workflow.send(:normalize_execution_identifier, :step_id, step_id) + child_workflows_by_step_id[normalized_step_id] + end + + def fetch_child_workflow(step_id) + normalized_step_id = Workflow.send(:normalize_execution_identifier, :step_id, step_id) + child_workflows_by_step_id.fetch(normalized_step_id) do + raise InvalidExecutionError, "unknown child workflow for step #{normalized_step_id.inspect}" + end + end + + private + + def validate_relationships + seen_parent_step_ids = {} + + child_workflows.each do |child_workflow| + parent_step_id = child_workflow.parent_step_id + inspected_parent_step_id = parent_step_id.inspect + raise InvalidExecutionError, "duplicate child workflow for step #{inspected_parent_step_id}" if seen_parent_step_ids.key?(parent_step_id) + + seen_parent_step_ids[parent_step_id] = true + expected_workflow_id = child_workflow_ids_by_step_id[parent_step_id] + raise InvalidExecutionError, "unknown child workflow step #{inspected_parent_step_id}" unless expected_workflow_id + next if expected_workflow_id == child_workflow.child_workflow_id + + raise InvalidExecutionError, 'child workflow relationship id must match declared child workflow id' + end + end + end + # Groups normalized snapshot identity fields. class Identity attr_reader :batch_id, :captured_at, :workflow_id @@ -199,9 +276,10 @@ def build_step_states # Builds ordered per-step runtime inspection values. class StepInspection - def initialize(identity:, membership:) + def initialize(identity:, membership:, child_relationships:) @identity = identity @membership = membership + @child_relationships = child_relationships @steps = build_steps @steps_by_id = @steps.to_h { |step_snapshot| [step_snapshot.step_id, step_snapshot] }.freeze freeze @@ -223,7 +301,7 @@ def fetch_step(step_id) private - attr_reader :identity, :membership, :steps_by_id + attr_reader :child_relationships, :identity, :membership, :steps_by_id def build_steps membership.step_job_ids.map do |step_id, job_id| @@ -235,7 +313,9 @@ def build_steps job_id:, job: membership.jobs_by_id.fetch(job_id), prerequisite_job_ids:, - prerequisite_states: prerequisite_states_for(prerequisite_job_ids) + prerequisite_states: prerequisite_states_for(prerequisite_job_ids), + child_workflow_id: child_relationships.child_workflow_id(step_id), + child_workflow: child_relationships.child_workflow(step_id) ) end.freeze end @@ -248,11 +328,53 @@ def prerequisite_states_for(prerequisite_job_ids) end end + # Normalizes child workflow declarations by parent step id. + class ChildWorkflowIds + def initialize(child_workflow_ids_by_step_id) + @child_workflow_ids_by_step_id = child_workflow_ids_by_step_id + end + + def to_h + raise InvalidExecutionError, 'child_workflow_ids_by_step_id must be a Hash' unless child_workflow_ids_by_step_id.is_a?(Hash) + + child_workflow_ids_by_step_id.each_with_object({}) do |(step_id, child_workflow_id), normalized| + normalized_step_id = Workflow.send(:normalize_execution_identifier, :step_id, step_id) + normalized[normalized_step_id] = Workflow.send(:normalize_identifier, :child_workflow_id, child_workflow_id) + end.freeze + end + + private + + attr_reader :child_workflow_ids_by_step_id + end + + # Normalizes child workflow relationship snapshots. + class ChildWorkflowList + def initialize(child_workflows) + @child_workflows = child_workflows + end + + def to_a + raise InvalidExecutionError, 'child_workflows must be an Array' unless child_workflows.is_a?(Array) + + child_workflows.each do |child_workflow| + unless child_workflow.is_a?(ChildWorkflowSnapshot) + raise InvalidExecutionError, 'child_workflows entries must be Karya::Workflow::ChildWorkflowSnapshot' + end + end + child_workflows.dup.freeze + end + + private + + attr_reader :child_workflows + end + # Groups snapshot state summary fields. class SummaryData attr_reader :completed_count, :failed_count, :state, :state_counts, :total_count - def initialize(membership) + def initialize(membership, child_relationships) jobs = membership.jobs summary = Summary.new(jobs) @state_counts = summary.state_counts @@ -261,7 +383,9 @@ def initialize(membership) @failed_count = summary.failed_count @state = State.new( jobs:, - dependency_job_ids_by_job_id: membership.dependency_job_ids_by_job_id + step_job_ids: membership.step_job_ids, + dependency_job_ids_by_job_id: membership.dependency_job_ids_by_job_id, + child_relationships: ).to_sym freeze end @@ -411,10 +535,13 @@ def active? # Derives workflow state from current job states and prerequisites. class State - def initialize(jobs:, dependency_job_ids_by_job_id:) + def initialize(jobs:, step_job_ids:, dependency_job_ids_by_job_id:, child_relationships:) @jobs = jobs + @step_job_ids = step_job_ids @dependency_job_ids_by_job_id = dependency_job_ids_by_job_id + @child_relationships = child_relationships @jobs_by_id = jobs.to_h { |job| [job.id, job] } + @step_id_by_job_id = step_job_ids.to_h { |step_id, job_id| [job_id, step_id] } end def to_sym @@ -431,7 +558,7 @@ def to_sym private - attr_reader :dependency_job_ids_by_job_id, :jobs, :jobs_by_id + attr_reader :child_relationships, :dependency_job_ids_by_job_id, :jobs, :jobs_by_id, :step_id_by_job_id def failed? jobs.any? { |job| FAILED_STATES.include?(job.state) } @@ -455,7 +582,7 @@ def progressed? def blocked? jobs.any? do |job| - WAITING_STATES.include?(job.state) && dependency_blocked?(job) + WAITING_STATES.include?(job.state) && (dependency_blocked?(job) || child_workflow_blocked?(job)) end end @@ -465,9 +592,23 @@ def dependency_blocked?(job) !dependency_job || dependency_job.state != :succeeded end end + + def child_workflow_blocked?(job) + step_id = step_id_by_job_id.fetch(job.id) + child_workflow_id = child_relationships.child_workflow_id(step_id) + return false unless child_workflow_id + + child_workflow = child_relationships.child_workflow(step_id) + return true unless child_workflow + + child_workflow.child_state != :succeeded + end end private_constant :Attributes, + :ChildRelationships, + :ChildWorkflowIds, + :ChildWorkflowList, :COMPLETED_STATES, :DependencyJobIdList, :DependencyJobIds, @@ -489,7 +630,7 @@ def dependency_blocked?(job) private - attr_reader :identity, :membership, :step_inspection, :summary_data + attr_reader :child_relationships, :identity, :membership, :step_inspection, :summary_data end end end diff --git a/core/karya/lib/karya/workflow/step.rb b/core/karya/lib/karya/workflow/step.rb index 198150a0..66cb8b44 100644 --- a/core/karya/lib/karya/workflow/step.rb +++ b/core/karya/lib/karya/workflow/step.rb @@ -12,6 +12,7 @@ module Workflow # Immutable one-step workflow composition unit. class Step attr_reader :arguments, + :child_workflow, :compensate_with, :compensation_arguments, :depends_on, @@ -24,6 +25,7 @@ def initialize(id:, handler:, **options) normalized_options = Options.new(options) @arguments = Arguments.new(normalized_options.arguments, step_id: @id, handler: @handler).normalize @depends_on = Dependencies.new(normalized_options.depends_on).normalize + @child_workflow = ChildWorkflow.new(normalized_options.child_workflow).normalize @compensate_with = CompensationHandler.new(normalized_options.compensate_with).normalize @compensation_arguments = Arguments.new( normalized_options.compensation_arguments, @@ -38,9 +40,13 @@ def compensable? !!compensate_with end + def child_workflow? + !!child_workflow + end + # Centralizes optional constructor field defaults and key validation. class Options - ALLOWED_KEYS = %i[arguments depends_on compensate_with compensation_arguments].freeze + ALLOWED_KEYS = %i[arguments depends_on compensate_with compensation_arguments child_workflow].freeze def initialize(options) @options = options @@ -59,6 +65,10 @@ def compensate_with options.fetch(:compensate_with, nil) end + def child_workflow + options.fetch(:child_workflow, nil) + end + def compensation_arguments options.fetch(:compensation_arguments, {}) end @@ -86,6 +96,26 @@ def formatted_unexpected_keys end end + # Normalizes an optional child workflow id. + class ChildWorkflow + def initialize(value) + @value = value + end + + def normalize + case value + when NilClass + nil + else + Workflow.send(:normalize_identifier, :child_workflow, value) + end + end + + private + + attr_reader :value + end + # Normalizes workflow step arguments into the same immutable scalar graph # shape used by jobs without coupling workflow code to job internals. class Arguments @@ -155,7 +185,7 @@ def normalize attr_reader :value end - private_constant :Arguments, :CompensationHandler, :Dependencies, :Options + private_constant :Arguments, :ChildWorkflow, :CompensationHandler, :Dependencies, :Options private diff --git a/core/karya/lib/karya/workflow/step_snapshot.rb b/core/karya/lib/karya/workflow/step_snapshot.rb index d2972710..f1816115 100644 --- a/core/karya/lib/karya/workflow/step_snapshot.rb +++ b/core/karya/lib/karya/workflow/step_snapshot.rb @@ -12,6 +12,8 @@ class StepSnapshot WAITING_STATES = %i[queued submission].freeze attr_reader :batch_id, + :child_workflow, + :child_workflow_id, :job, :job_id, :prerequisite_job_ids, @@ -33,11 +35,19 @@ def initialize(**attributes) prerequisite_job_ids: @prerequisite_job_ids, prerequisite_states: attributes.prerequisite_states ).to_h + @child_workflow_id = attributes.child_workflow_id + @child_workflow = ChildWorkflow.new( + child_workflow: attributes.child_workflow, + child_workflow_id: @child_workflow_id, + parent_batch_id: @batch_id, + parent_step_id: @step_id, + parent_job_id: @job_id + ).to_snapshot freeze end def ready? - waiting? && prerequisite_job_ids.all? { |prerequisite_job_id| prerequisite_states.fetch(prerequisite_job_id) == :succeeded } + waiting? && prerequisites_succeeded? && child_workflow_succeeded? end def blocked? @@ -52,6 +62,10 @@ def terminal? job.terminal? end + def child_workflow? + !!child_workflow_id + end + # Validates and exposes step snapshot construction attributes. class Attributes REQUIRED_ATTRIBUTES = %i[ @@ -63,6 +77,8 @@ class Attributes prerequisite_job_ids prerequisite_states ].freeze + OPTIONAL_ATTRIBUTES = %i[child_workflow_id child_workflow].freeze + SUPPORTED_ATTRIBUTES = (REQUIRED_ATTRIBUTES + OPTIONAL_ATTRIBUTES).freeze def initialize(attributes) @attributes = attributes @@ -97,6 +113,17 @@ def prerequisite_states fetch(:prerequisite_states) end + def child_workflow_id + value = attributes.fetch(:child_workflow_id, nil) + return unless value + + Workflow.send(:normalize_identifier, :child_workflow_id, value) + end + + def child_workflow + attributes.fetch(:child_workflow, nil) + end + private attr_reader :attributes @@ -106,13 +133,45 @@ def fetch(name) end def validate_keys - unknown_keys = attributes.keys - REQUIRED_ATTRIBUTES + unknown_keys = attributes.keys - SUPPORTED_ATTRIBUTES return if unknown_keys.empty? raise ArgumentError, "unknown keyword: :#{unknown_keys.first}" end end + # Validates optional child workflow relationship metadata. + class ChildWorkflow + def initialize(child_workflow:, child_workflow_id:, parent_batch_id:, parent_step_id:, parent_job_id:) + @child_workflow = child_workflow + @child_workflow_id = child_workflow_id + @parent_batch_id = parent_batch_id + @parent_step_id = parent_step_id + @parent_job_id = parent_job_id + end + + def to_snapshot + return unless child_workflow + raise InvalidExecutionError, 'child_workflow must be Karya::Workflow::ChildWorkflowSnapshot' unless child_workflow.is_a?(ChildWorkflowSnapshot) + + validate_identity + child_workflow + end + + private + + attr_reader :child_workflow, :child_workflow_id, :parent_batch_id, :parent_job_id, :parent_step_id + + def validate_identity + raise InvalidExecutionError, 'child_workflow_id must match child workflow relationship' if child_workflow_id != child_workflow.child_workflow_id + raise InvalidExecutionError, 'child workflow parent batch must match step batch' unless parent_batch_id == child_workflow.parent_batch_id + raise InvalidExecutionError, 'child workflow parent step must match step id' unless parent_step_id == child_workflow.parent_step_id + return if parent_job_id == child_workflow.parent_job_id + + raise InvalidExecutionError, 'child workflow parent job must match step job' + end + end + # Validates the concrete job backing a step snapshot. class JobEntry def initialize(job_id:, job:) @@ -199,13 +258,24 @@ def validate_membership(normalized_states) end end - private_constant :Attributes, :JobEntry, :JobIdList, :PrerequisiteStates, :WAITING_STATES + private_constant :Attributes, :ChildWorkflow, :JobEntry, :JobIdList, :PrerequisiteStates, :WAITING_STATES private def waiting? WAITING_STATES.include?(state) end + + def prerequisites_succeeded? + prerequisite_job_ids.all? { |prerequisite_job_id| prerequisite_states.fetch(prerequisite_job_id) == :succeeded } + end + + def child_workflow_succeeded? + return true unless child_workflow_id + return false unless child_workflow + + child_workflow.child_state == :succeeded + end end end end diff --git a/core/karya/sig/karya.rbs b/core/karya/sig/karya.rbs index edc18ed5..a018986d 100644 --- a/core/karya/sig/karya.rbs +++ b/core/karya/sig/karya.rbs @@ -25,8 +25,24 @@ module Karya type normalized_state_name = String type normalized_state_name_array = Array[normalized_state_name] type normalized_transition_map = Hash[normalized_state_name, normalized_state_name_array] - type step_snapshot_attribute_value = state_name | Job | Array[state_name] | Hash[state_name, state_name?] - type workflow_snapshot_attribute_value = state_name | Time | Hash[state_name, state_name] | Hash[state_name, state_name_array] | Array[Job] | Workflow::RollbackSnapshot | nil + type child_workflow_snapshot_attribute_value = state_name | Karya::workflow_state + type step_snapshot_attribute_value = + state_name | + Job | + Array[state_name] | + Hash[state_name, state_name?] | + Workflow::ChildWorkflowSnapshot | + nil + type workflow_snapshot_attribute_value = + state_name | + Time | + Hash[state_name, state_name] | + Hash[state_name, state_name_array] | + Array[Job] | + Array[Workflow::ChildWorkflowSnapshot] | + Workflow::ChildWorkflowSnapshot | + Workflow::RollbackSnapshot | + nil type handler_parameter = [Symbol, Symbol?] type handler_parameters = Array[handler_parameter] type handler_dispatch_mode = :none | :positional_hash | :keywords @@ -186,7 +202,9 @@ module Karya :replay_dead_letter_jobs | :retry_dead_letter_jobs | :discard_dead_letter_jobs | + :enqueue_child_workflow | :rollback_workflow | + :sync_child_workflows | :retry_workflow_steps | :dead_letter_workflow_steps | :replay_workflow_steps | diff --git a/core/karya/sig/karya/queue_store/base.rbs b/core/karya/sig/karya/queue_store/base.rbs index ad082bb4..aa98ca02 100644 --- a/core/karya/sig/karya/queue_store/base.rbs +++ b/core/karya/sig/karya/queue_store/base.rbs @@ -23,6 +23,18 @@ module Karya def workflow_snapshot: (batch_id: Karya::state_name, now: Time) -> Karya::Workflow::Snapshot + def enqueue_child_workflow: ( + parent_batch_id: Karya::state_name, + parent_step_id: Karya::state_name, + definition: Karya::Workflow::Definition, + jobs_by_step_id: Hash[Karya::state_name, Job], + batch_id: Karya::state_name, + now: Time, + ?compensation_jobs_by_step_id: Hash[Karya::state_name, Job] + ) -> BulkMutationReport + + def sync_child_workflows: (parent_batch_id: Karya::state_name, now: Time) -> BulkMutationReport + def rollback_workflow: (batch_id: Karya::state_name, now: Time, reason: String) -> BulkMutationReport def retry_workflow_steps: (batch_id: Karya::state_name, step_ids: Array[Karya::state_name], now: Time) -> BulkMutationReport diff --git a/core/karya/sig/karya/queue_store/in_memory.rbs b/core/karya/sig/karya/queue_store/in_memory.rbs index 3d691e98..4a4a73ee 100644 --- a/core/karya/sig/karya/queue_store/in_memory.rbs +++ b/core/karya/sig/karya/queue_store/in_memory.rbs @@ -45,6 +45,16 @@ module Karya ?compensation_jobs_by_step_id: Hash[Karya::state_name, Job] ) -> BulkMutationReport def workflow_snapshot: (batch_id: Karya::state_name, now: Time) -> Karya::Workflow::Snapshot + def enqueue_child_workflow: ( + parent_batch_id: Karya::state_name, + parent_step_id: Karya::state_name, + definition: Karya::Workflow::Definition, + jobs_by_step_id: Hash[Karya::state_name, Job], + batch_id: Karya::state_name, + now: Time, + ?compensation_jobs_by_step_id: Hash[Karya::state_name, Job] + ) -> BulkMutationReport + def sync_child_workflows: (parent_batch_id: Karya::state_name, now: Time) -> BulkMutationReport def rollback_workflow: (batch_id: Karya::state_name, now: Time, reason: String) -> BulkMutationReport def retry_workflow_steps: (batch_id: Karya::state_name, step_ids: Array[Karya::state_name], now: Time) -> BulkMutationReport def dead_letter_workflow_steps: ( @@ -172,7 +182,8 @@ module Karya now: Time ) { (String, Array[Job], Array[Karya::bulk_skipped_job]) -> void } -> BulkMutationReport def workflow_control_job_ids: (Karya::state_name batch_id, Array[Karya::state_name] step_ids) -> Array[String] - def workflow_dependencies_satisfied?: (Job job) -> bool + def workflow_dependencies_satisfied?: (Job job, now: Time) -> bool + def workflow_child_satisfied?: (Job job, now: Time) -> bool end end end diff --git a/core/karya/sig/karya/queue_store/in_memory/internal/child_workflow_support.rbs b/core/karya/sig/karya/queue_store/in_memory/internal/child_workflow_support.rbs new file mode 100644 index 00000000..9fc35b91 --- /dev/null +++ b/core/karya/sig/karya/queue_store/in_memory/internal/child_workflow_support.rbs @@ -0,0 +1,142 @@ +module Karya + module QueueStore + class InMemory + module Internal + module ChildWorkflowSupport + def enqueue_child_workflow: ( + parent_batch_id: Karya::state_name, + parent_step_id: Karya::state_name, + definition: Karya::Workflow::Definition, + jobs_by_step_id: Hash[Karya::state_name, Job], + batch_id: Karya::state_name, + now: Time, + ?compensation_jobs_by_step_id: Hash[Karya::state_name, Job] + ) -> BulkMutationReport + def sync_child_workflows: (parent_batch_id: Karya::state_name, now: Time) -> BulkMutationReport + + private + + class ChildWorkflowRequest + @now: Time + @parent_batch_id: String + @parent_step_id: String + + attr_reader now: Time + attr_reader parent_batch_id: String + attr_reader parent_step_id: String + + def initialize: (parent_batch_id: String, parent_step_id: String, now: Time) -> void + end + + class ChildWorkflowSyncRequest + @now: Time + @parent_batch_id: String + + attr_reader now: Time + attr_reader parent_batch_id: String + + def initialize: (parent_batch_id: String, now: Time) -> void + end + + class ParentChildWorkflow + @parent_workflow_id: String + @parent_batch_id: String + @parent_job_id: String + + attr_reader parent_workflow_id: String + attr_reader parent_batch_id: String + attr_reader parent_job_id: String + end + + class ChildStepJobIds + @definition: Karya::Workflow::Definition + @jobs: Array[Job] + + def initialize: (definition: Karya::Workflow::Definition, jobs: Array[Job]) -> void + def to_h: () -> Hash[String, String] + + private + + attr_reader definition: Karya::Workflow::Definition + attr_reader jobs: Array[Job] + end + + class ChildWorkflowMetadata + @state: InMemory::Internal::StoreState + @parent: ParentChildWorkflow + @parent_step_id: String + @binding: Karya::Workflow::ExecutionBinding + @definition: Karya::Workflow::Definition + + def initialize: ( + state: InMemory::Internal::StoreState, + parent: ParentChildWorkflow, + parent_step_id: String, + binding: Karya::Workflow::ExecutionBinding, + definition: Karya::Workflow::Definition + ) -> void + def register: () -> InMemory::Internal::StoreState::WorkflowChildren::Relationship + + private + + attr_reader state: InMemory::Internal::StoreState + attr_reader parent: ParentChildWorkflow + attr_reader parent_step_id: String + attr_reader binding: Karya::Workflow::ExecutionBinding + attr_reader definition: Karya::Workflow::Definition + end + + class ChildWorkflowReport + @binding: Karya::Workflow::ExecutionBinding + @queued_jobs: Array[Job] + @now: Time + + def initialize: (binding: Karya::Workflow::ExecutionBinding, queued_jobs: Array[Job], now: Time) -> void + def to_report: () -> BulkMutationReport + + private + + attr_reader binding: Karya::Workflow::ExecutionBinding + attr_reader queued_jobs: Array[Job] + attr_reader now: Time + end + + def enqueue_child_workflow_binding: ( + parent: ParentChildWorkflow, + parent_step_id: String, + binding: Karya::Workflow::ExecutionBinding, + definition: Karya::Workflow::Definition, + now: Time + ) -> BulkMutationReport + def prepare_child_workflow_parent: ( + parent_batch_id: String, + parent_step_id: String, + definition: Karya::Workflow::Definition + ) -> ParentChildWorkflow + def validate_child_workflow_parent_step: (String parent_step_id, String? expected_child_workflow_id) -> void + def validate_child_workflow_definition: ( + Karya::Workflow::Definition definition, + String expected_child_workflow_id, + String parent_step_id + ) -> void + def validate_child_workflow_not_registered: (String parent_batch_id, String parent_step_id) -> void + def validate_child_workflow_parent_job: ( + InMemory::Internal::StoreState::WorkflowRegistration parent_registration, + String parent_step_id, + String parent_batch_id + ) -> ParentChildWorkflow + def validate_child_batch_identity: (parent_batch_id: String, child_batch_id: String) -> void + def child_relationship: (String parent_batch_id, String parent_step_id) -> InMemory::Internal::StoreState::WorkflowChildren::Relationship? + def child_relationships_for_parent_batch: (String parent_batch_id) -> Array[InMemory::Internal::StoreState::WorkflowChildren::Relationship] + def sync_child_workflow_job: ( + job_id: String, + now: Time, + changed_jobs: Array[Job], + skipped_jobs: Array[Karya::bulk_skipped_job] + ) -> void + def child_workflow_state: (String child_batch_id, now: Time) -> Karya::workflow_state + end + end + end + end +end diff --git a/core/karya/sig/karya/queue_store/in_memory/internal/store_state.rbs b/core/karya/sig/karya/queue_store/in_memory/internal/store_state.rbs index 10aa1e36..bb297e11 100644 --- a/core/karya/sig/karya/queue_store/in_memory/internal/store_state.rbs +++ b/core/karya/sig/karya/queue_store/in_memory/internal/store_state.rbs @@ -29,6 +29,7 @@ module Karya @stuck_job_recoveries_by_id: Hash[String, Karya::stuck_job_recovery] @terminal_batch_ids_index: Hash[String, bool] @terminal_batch_ids_in_order: Array[String] + @workflow_children: WorkflowChildren @workflow_dependency_job_ids_by_job_id: Hash[String, Array[String]] @workflow_rollback_batch_ids: Hash[String, bool] @workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration] @@ -53,6 +54,7 @@ module Karya attr_reader reservation_tokens_in_order: Array[String] attr_reader reservations_by_token: Hash[String, Reservation] attr_reader stuck_job_recoveries_by_id: Hash[String, Karya::stuck_job_recovery] + attr_reader workflow_children: WorkflowChildren attr_reader workflow_dependency_job_ids_by_job_id: Hash[String, Array[String]] attr_reader workflow_rollback_batch_ids: Hash[String, bool] attr_reader workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration] @@ -84,12 +86,15 @@ module Karya def reservation_token_in_use?: (String reservation_token) -> bool def register_batch: (Karya::Workflow::Batch batch) -> Karya::Workflow::Batch def prune_terminal_batches: (Integer retention_limit, ?changed_job: Job?) -> Array[String] + def register_workflow_dependencies: (Hash[String, Array[String]] dependency_job_ids_by_job_id) -> Hash[String, Array[String]] + def workflow_dependency_job_ids_for: (String job_id) -> Array[String]? def register_workflow: ( batch_id: String, workflow_id: String, step_job_ids: Hash[String, String], dependency_job_ids_by_job_id: Hash[String, Array[String]], - compensation_jobs_by_step_id: Hash[String, Job] + compensation_jobs_by_step_id: Hash[String, Job], + ?child_workflow_ids_by_step_id: Hash[String, String] ) -> WorkflowRegistration def register_workflow_rollback: ( batch_id: String, @@ -108,18 +113,21 @@ module Karya @step_job_ids: Hash[String, String] @dependency_job_ids_by_job_id: Hash[String, Array[String]] @compensation_jobs_by_step_id: Hash[String, Job] + @child_workflow_ids_by_step_id: Hash[String, String] def initialize: ( String workflow_id, Hash[String, String] step_job_ids, Hash[String, Array[String]] dependency_job_ids_by_job_id, - Hash[String, Job] compensation_jobs_by_step_id + Hash[String, Job] compensation_jobs_by_step_id, + Hash[String, String] child_workflow_ids_by_step_id ) -> void attr_reader workflow_id: String attr_reader step_job_ids: Hash[String, String] attr_reader dependency_job_ids_by_job_id: Hash[String, Array[String]] attr_reader compensation_jobs_by_step_id: Hash[String, Job] + attr_reader child_workflow_ids_by_step_id: Hash[String, String] end class PrunedBatchCleanup @@ -130,6 +138,7 @@ module Karya workflow_dependency_job_ids_by_job_id: Hash[String, Array[String]] } @workflow_indexes: { + workflow_children: WorkflowChildren, workflow_rollback_batch_ids: Hash[String, bool], workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration], workflow_rollbacks_by_batch_id: Hash[String, WorkflowRollback] @@ -143,6 +152,7 @@ module Karya workflow_dependency_job_ids_by_job_id: Hash[String, Array[String]] }, workflow_indexes: { + workflow_children: WorkflowChildren, workflow_rollback_batch_ids: Hash[String, bool], workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration], workflow_rollbacks_by_batch_id: Hash[String, WorkflowRollback] @@ -156,6 +166,7 @@ module Karya workflow_dependency_job_ids_by_job_id: Hash[String, Array[String]] }, workflow_indexes: { + workflow_children: WorkflowChildren, workflow_rollback_batch_ids: Hash[String, bool], workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration], workflow_rollbacks_by_batch_id: Hash[String, WorkflowRollback] @@ -172,6 +183,7 @@ module Karya workflow_dependency_job_ids_by_job_id: Hash[String, Array[String]] } attr_reader workflow_indexes: { + workflow_children: WorkflowChildren, workflow_rollback_batch_ids: Hash[String, bool], workflow_registrations_by_batch_id: Hash[String, WorkflowRegistration], workflow_rollbacks_by_batch_id: Hash[String, WorkflowRollback] @@ -182,6 +194,7 @@ module Karya def cleanup_stale_batch_membership: () -> Array[String] def batch_id_by_job_id: () -> Hash[String, String] def workflow_dependency_job_ids_by_job_id: () -> Hash[String, Array[String]] + def workflow_children: () -> WorkflowChildren def workflow_rollback_batch_ids: () -> Hash[String, bool] def workflow_registrations_by_batch_id: () -> Hash[String, WorkflowRegistration] def workflow_rollbacks_by_batch_id: () -> Hash[String, WorkflowRollback] @@ -209,6 +222,58 @@ module Karya attr_reader compensation_job_ids: Array[String] end + class WorkflowChildren + @by_child_batch_id: Hash[String, WorkflowChildren::Relationship] + @by_parent_batch_id: Hash[String, Hash[String, WorkflowChildren::Relationship]] + @by_parent_job_id: Hash[String, WorkflowChildren::Relationship] + @expected_child_workflow_id_by_job_id: Hash[String, String] + + attr_reader expected_child_workflow_id_by_job_id: Hash[String, String] + + def initialize: () -> void + def register_expected_child: (String parent_job_id, String child_workflow_id) -> String + def register: ( + parent_workflow_id: String, + parent_batch_id: String, + parent_step_id: String, + parent_job_id: String, + child_workflow_id: String, + child_batch_id: String + ) -> WorkflowChildren::Relationship + def for_parent_step: (String parent_batch_id, String parent_step_id) -> WorkflowChildren::Relationship? + def for_parent_batch: (String parent_batch_id) -> Array[WorkflowChildren::Relationship] + def for_parent_job: (String parent_job_id) -> WorkflowChildren::Relationship? + def for_child_batch: (String child_batch_id) -> WorkflowChildren::Relationship? + def delete_by_parent_batch: (String parent_batch_id) -> Array[WorkflowChildren::Relationship] + def delete_by_child_batch: (String child_batch_id) -> WorkflowChildren::Relationship? + def delete_expected_children: (Array[String] parent_job_ids) -> Array[String] + + private + + def parent_relationships: (String parent_batch_id) -> Hash[String, WorkflowChildren::Relationship] + def ensure_parent_relationships: (String parent_batch_id) -> Hash[String, WorkflowChildren::Relationship] + def delete_relationship: ( + WorkflowChildren::Relationship relationship, + ?remove_parent_batch: bool + ) -> WorkflowChildren::Relationship + + class Relationship + @parent_workflow_id: String + @parent_batch_id: String + @parent_step_id: String + @parent_job_id: String + @child_workflow_id: String + @child_batch_id: String + + attr_reader parent_workflow_id: String + attr_reader parent_batch_id: String + attr_reader parent_step_id: String + attr_reader parent_job_id: String + attr_reader child_workflow_id: String + attr_reader child_batch_id: String + end + end + def trim_fair_queue_history: () -> void def prune_expired_reservation_tokens: () -> bool? end diff --git a/core/karya/sig/karya/queue_store/in_memory/internal/workflow_child_ids.rbs b/core/karya/sig/karya/queue_store/in_memory/internal/workflow_child_ids.rbs new file mode 100644 index 00000000..f844c9aa --- /dev/null +++ b/core/karya/sig/karya/queue_store/in_memory/internal/workflow_child_ids.rbs @@ -0,0 +1,31 @@ +module Karya + module QueueStore + class InMemory + module Internal + class WorkflowChildIds + @definition: Karya::Workflow::Definition + + def initialize: (Karya::Workflow::Definition definition) -> void + def to_h: () -> Hash[String, String] + + private + + attr_reader definition: Karya::Workflow::Definition + + class StepEntry + @workflow_step: Karya::Workflow::Step + + def initialize: (Karya::Workflow::Step workflow_step) -> void + def store_in: (Hash[String, String] child_workflow_ids) -> String? + + private + + attr_reader workflow_step: Karya::Workflow::Step + def id: () -> String + def child_workflow: () -> String? + end + end + end + end + end +end diff --git a/core/karya/sig/karya/queue_store/in_memory/internal/workflow_child_state.rbs b/core/karya/sig/karya/queue_store/in_memory/internal/workflow_child_state.rbs new file mode 100644 index 00000000..8ec0f3d5 --- /dev/null +++ b/core/karya/sig/karya/queue_store/in_memory/internal/workflow_child_state.rbs @@ -0,0 +1,85 @@ +module Karya + module QueueStore + class InMemory + module Internal + class WorkflowChildState + @store_state: InMemory::Internal::StoreState + @now: Time + @cache: Hash[String, Karya::workflow_state] + @visiting: Hash[String, bool] + + def initialize: ( + state: InMemory::Internal::StoreState, + now: Time, + ?cache: Hash[String, Karya::workflow_state], + ?visiting: Hash[String, bool] + ) -> void + def resolve: (String batch_id) -> Karya::workflow_state + + private + + attr_reader store_state: InMemory::Internal::StoreState + attr_reader now: Time + attr_reader cache: Hash[String, Karya::workflow_state] + attr_reader visiting: Hash[String, bool] + + class StateSnapshot + @batch_id: String + @store_state: InMemory::Internal::StoreState + @now: Time + @cache: Hash[String, Karya::workflow_state] + @visiting: Hash[String, bool] + + def initialize: ( + batch_id: String, + state: InMemory::Internal::StoreState, + now: Time, + cache: Hash[String, Karya::workflow_state], + visiting: Hash[String, bool] + ) -> void + def state: () -> Karya::workflow_state + + private + + attr_reader batch_id: String + attr_reader store_state: InMemory::Internal::StoreState + attr_reader now: Time + attr_reader cache: Hash[String, Karya::workflow_state] + attr_reader visiting: Hash[String, bool] + def batch: () -> Karya::Workflow::Batch + def registration: () -> InMemory::Internal::StoreState::WorkflowRegistration + def jobs: () -> Array[Job] + def child_workflows: () -> Array[Karya::Workflow::ChildWorkflowSnapshot] + + class RelationshipSnapshot + @relationship: InMemory::Internal::StoreState::WorkflowChildren::Relationship + @store_state: InMemory::Internal::StoreState + @now: Time + @cache: Hash[String, Karya::workflow_state] + @visiting: Hash[String, bool] + + def initialize: ( + relationship: InMemory::Internal::StoreState::WorkflowChildren::Relationship, + store_state: InMemory::Internal::StoreState, + now: Time, + cache: Hash[String, Karya::workflow_state], + visiting: Hash[String, bool] + ) -> void + def to_snapshot: () -> Karya::Workflow::ChildWorkflowSnapshot + + private + + attr_reader relationship: InMemory::Internal::StoreState::WorkflowChildren::Relationship + attr_reader store_state: InMemory::Internal::StoreState + attr_reader now: Time + attr_reader cache: Hash[String, Karya::workflow_state] + attr_reader visiting: Hash[String, bool] + def child_batch_id: () -> String + def child_state: () -> Karya::workflow_state + end + end + end + end + end + end +end diff --git a/core/karya/sig/karya/queue_store/in_memory/internal/workflow_support.rbs b/core/karya/sig/karya/queue_store/in_memory/internal/workflow_support.rbs index 462b27c8..3f14cfa5 100644 --- a/core/karya/sig/karya/queue_store/in_memory/internal/workflow_support.rbs +++ b/core/karya/sig/karya/queue_store/in_memory/internal/workflow_support.rbs @@ -140,7 +140,8 @@ module Karya now: Time ) { (String, Array[Job], Array[Karya::bulk_skipped_job]) -> void } -> BulkMutationReport def workflow_control_job_ids: (Karya::state_name batch_id, Array[Karya::state_name] step_ids) -> Array[String] - def workflow_dependencies_satisfied?: (Job job) -> bool + + private class WorkflowSnapshotBuilder @batch: Karya::Workflow::Batch @@ -166,6 +167,26 @@ module Karya attr_reader now: Time attr_reader state: InMemory::Internal::StoreState def rollback_snapshot: () -> Karya::Workflow::RollbackSnapshot? + def child_workflow_snapshots: () -> Array[Karya::Workflow::ChildWorkflowSnapshot] + def parent_snapshot: () -> Karya::Workflow::ChildWorkflowSnapshot? + def child_state_resolver: () -> InMemory::Internal::WorkflowChildState + end + + class ChildWorkflowSnapshotBuilder + @relationship: InMemory::Internal::StoreState::WorkflowChildren::Relationship + @resolver: InMemory::Internal::WorkflowChildState + + def initialize: ( + relationship: InMemory::Internal::StoreState::WorkflowChildren::Relationship, + resolver: InMemory::Internal::WorkflowChildState + ) -> void + def to_snapshot: () -> Karya::Workflow::ChildWorkflowSnapshot + + private + + attr_reader relationship: InMemory::Internal::StoreState::WorkflowChildren::Relationship + attr_reader resolver: InMemory::Internal::WorkflowChildState + def child_state: () -> Karya::workflow_state end class RollbackSnapshotAttributes @@ -202,6 +223,9 @@ module Karya def dependencies_satisfied?: (Job job) -> bool def validation_error_message: () -> String end + + def workflow_dependencies_satisfied?: (Job job, now: Time) -> bool + def workflow_child_satisfied?: (Job job, now: Time) -> bool end end end diff --git a/core/karya/sig/karya/workflow.rbs b/core/karya/sig/karya/workflow.rbs index 9392a235..ecd85878 100644 --- a/core/karya/sig/karya/workflow.rbs +++ b/core/karya/sig/karya/workflow.rbs @@ -236,6 +236,57 @@ module Karya end end + class ChildWorkflowSnapshot + @parent_workflow_id: String + @parent_batch_id: String + @parent_step_id: String + @parent_job_id: String + @child_workflow_id: String + @child_batch_id: String + @child_state: Karya::workflow_state + + attr_reader parent_workflow_id: String + attr_reader parent_batch_id: String + attr_reader parent_step_id: String + attr_reader parent_job_id: String + attr_reader child_workflow_id: String + attr_reader child_batch_id: String + attr_reader child_state: Karya::workflow_state + + def initialize: ( + parent_workflow_id: state_name, + parent_batch_id: state_name, + parent_step_id: state_name, + parent_job_id: state_name, + child_workflow_id: state_name, + child_batch_id: state_name, + child_state: Karya::workflow_state + ) -> void + + private + + class Attributes + REQUIRED_ATTRIBUTES: Array[Symbol] + + @attributes: Hash[Symbol, child_workflow_snapshot_attribute_value] + + def initialize: (Hash[Symbol, child_workflow_snapshot_attribute_value] attributes) -> void + def parent_workflow_id: () -> String + def parent_batch_id: () -> String + def parent_step_id: () -> String + def parent_job_id: () -> String + def child_workflow_id: () -> String + def child_batch_id: () -> String + def child_state: () -> Karya::workflow_state + + private + + attr_reader attributes: Hash[Symbol, child_workflow_snapshot_attribute_value] + def fetch: (Symbol name) -> child_workflow_snapshot_attribute_value + def validate_keys: () -> void + end + end + class StepSnapshot @workflow_id: String @batch_id: String @@ -245,6 +296,8 @@ module Karya @state: state_name @prerequisite_job_ids: Array[String] @prerequisite_states: Hash[String, state_name?] + @child_workflow_id: String? + @child_workflow: ChildWorkflowSnapshot? attr_reader workflow_id: String attr_reader batch_id: String @@ -254,6 +307,8 @@ module Karya attr_reader state: state_name attr_reader prerequisite_job_ids: Array[String] attr_reader prerequisite_states: Hash[String, state_name?] + attr_reader child_workflow_id: String? + attr_reader child_workflow: ChildWorkflowSnapshot? def initialize: ( workflow_id: state_name, @@ -262,17 +317,22 @@ module Karya job_id: state_name, job: Job, prerequisite_job_ids: Array[state_name], - prerequisite_states: Hash[state_name, state_name?] + prerequisite_states: Hash[state_name, state_name?], + ?child_workflow_id: state_name?, + ?child_workflow: ChildWorkflowSnapshot? ) -> void def ready?: () -> bool def blocked?: () -> bool def active?: () -> bool def terminal?: () -> bool + def child_workflow?: () -> bool private class Attributes REQUIRED_ATTRIBUTES: Array[Symbol] + OPTIONAL_ATTRIBUTES: Array[Symbol] + SUPPORTED_ATTRIBUTES: Array[Symbol] @attributes: Hash[Symbol, step_snapshot_attribute_value] @@ -284,6 +344,8 @@ module Karya def job: () -> Job def prerequisite_job_ids: () -> Array[String] def prerequisite_states: () -> Hash[state_name, state_name?] + def child_workflow_id: () -> String? + def child_workflow: () -> ChildWorkflowSnapshot? private @@ -292,6 +354,32 @@ module Karya def validate_keys: () -> void end + class ChildWorkflow + @child_workflow: ChildWorkflowSnapshot? + @child_workflow_id: String? + @parent_batch_id: String + @parent_step_id: String + @parent_job_id: String + + def initialize: ( + child_workflow: ChildWorkflowSnapshot?, + child_workflow_id: String?, + parent_batch_id: String, + parent_step_id: String, + parent_job_id: String + ) -> void + def to_snapshot: () -> ChildWorkflowSnapshot? + + private + + attr_reader child_workflow: ChildWorkflowSnapshot? + attr_reader child_workflow_id: String? + attr_reader parent_batch_id: String + attr_reader parent_step_id: String + attr_reader parent_job_id: String + def validate_identity: () -> void + end + class JobEntry @job_id: String @job: Job @@ -335,13 +423,19 @@ module Karya def validate_membership: (Hash[String, state_name?] normalized_states) -> void end + private + def waiting?: () -> bool + def prerequisites_succeeded?: () -> bool + def child_workflow_succeeded?: () -> bool end class Snapshot @identity: Identity @membership: Membership + @child_relationships: ChildRelationships @step_inspection: StepInspection + @parent: ChildWorkflowSnapshot? @rollback: RollbackSnapshot? @summary_data: SummaryData @@ -352,6 +446,9 @@ module Karya step_job_ids: Hash[state_name, state_name], dependency_job_ids_by_job_id: Hash[state_name, Array[state_name]], jobs: Array[Job], + ?child_workflow_ids_by_step_id: Hash[state_name, state_name], + ?child_workflows: Array[ChildWorkflowSnapshot], + ?parent: ChildWorkflowSnapshot?, ?rollback: RollbackSnapshot? ) -> void @@ -368,6 +465,10 @@ module Karya def job_id_for_step: (state_name step_id) -> String def state_for_step: (state_name step_id) -> state_name def rollback_requested?: () -> bool + def child_workflows: () -> Array[ChildWorkflowSnapshot] + def child_workflow: (state_name step_id) -> ChildWorkflowSnapshot? + def fetch_child_workflow: (state_name step_id) -> ChildWorkflowSnapshot + def parent: () -> ChildWorkflowSnapshot? def rollback: () -> RollbackSnapshot? def state_counts: () -> Hash[state_name, Integer] def total_count: () -> Integer @@ -379,6 +480,7 @@ module Karya attr_reader identity: Identity attr_reader membership: Membership + attr_reader child_relationships: ChildRelationships attr_reader step_inspection: StepInspection attr_reader summary_data: SummaryData @@ -389,6 +491,8 @@ module Karya def fetch: (Symbol name) -> workflow_snapshot_attribute_value def identity: () -> Identity def membership: () -> Membership + def child_relationships: () -> ChildRelationships + def parent: () -> ChildWorkflowSnapshot? def rollback: () -> RollbackSnapshot? private @@ -397,6 +501,28 @@ module Karya def validate_keys: () -> void end + class ChildRelationships + @child_workflow_ids_by_step_id: Hash[String, String] + @child_workflows: Array[ChildWorkflowSnapshot] + @child_workflows_by_step_id: Hash[String, ChildWorkflowSnapshot] + + attr_reader child_workflow_ids_by_step_id: Hash[String, String] + attr_reader child_workflows: Array[ChildWorkflowSnapshot] + attr_reader child_workflows_by_step_id: Hash[String, ChildWorkflowSnapshot] + + def initialize: ( + child_workflow_ids_by_step_id: Hash[String, String], + child_workflows: Array[ChildWorkflowSnapshot] + ) -> void + def child_workflow_id: (state_name step_id) -> String? + def child_workflow: (state_name step_id) -> ChildWorkflowSnapshot? + def fetch_child_workflow: (state_name step_id) -> ChildWorkflowSnapshot + + private + + def validate_relationships: () -> void + end + class Identity @workflow_id: String @batch_id: String @@ -439,10 +565,11 @@ module Karya class StepInspection @identity: Identity @membership: Membership + @child_relationships: ChildRelationships @steps: Array[StepSnapshot] @steps_by_id: Hash[String, StepSnapshot] - def initialize: (identity: Identity, membership: Membership) -> void + def initialize: (identity: Identity, membership: Membership, child_relationships: ChildRelationships) -> void attr_reader steps: Array[StepSnapshot] @@ -453,11 +580,34 @@ module Karya attr_reader identity: Identity attr_reader membership: Membership + attr_reader child_relationships: ChildRelationships attr_reader steps_by_id: Hash[String, StepSnapshot] def build_steps: () -> Array[StepSnapshot] def prerequisite_states_for: (Array[String] prerequisite_job_ids) -> Hash[String, state_name?] end + class ChildWorkflowIds + @child_workflow_ids_by_step_id: Hash[state_name, state_name] + + def initialize: (Hash[state_name, state_name] child_workflow_ids_by_step_id) -> void + def to_h: () -> Hash[String, String] + + private + + attr_reader child_workflow_ids_by_step_id: Hash[state_name, state_name] + end + + class ChildWorkflowList + @child_workflows: Array[ChildWorkflowSnapshot] + + def initialize: (Array[ChildWorkflowSnapshot] child_workflows) -> void + def to_a: () -> Array[ChildWorkflowSnapshot] + + private + + attr_reader child_workflows: Array[ChildWorkflowSnapshot] + end + class SummaryData @state_counts: Hash[state_name, Integer] @total_count: Integer @@ -471,7 +621,7 @@ module Karya attr_reader failed_count: Integer attr_reader state: Karya::workflow_state - def initialize: (Membership membership) -> void + def initialize: (Membership membership, ChildRelationships child_relationships) -> void end class Timestamp @@ -558,17 +708,27 @@ module Karya class State @jobs: Array[Job] + @step_job_ids: Hash[String, String] @dependency_job_ids_by_job_id: Hash[String, Array[String]] + @child_relationships: ChildRelationships @jobs_by_id: Hash[String, Job] + @step_id_by_job_id: Hash[String, String] - def initialize: (jobs: Array[Job], dependency_job_ids_by_job_id: Hash[String, Array[String]]) -> void + def initialize: ( + jobs: Array[Job], + step_job_ids: Hash[String, String], + dependency_job_ids_by_job_id: Hash[String, Array[String]], + child_relationships: ChildRelationships + ) -> void def to_sym: () -> Karya::workflow_state private attr_reader jobs: Array[Job] attr_reader dependency_job_ids_by_job_id: Hash[String, Array[String]] + attr_reader child_relationships: ChildRelationships attr_reader jobs_by_id: Hash[String, Job] + attr_reader step_id_by_job_id: Hash[String, String] def failed?: () -> bool def only_state?: (state_name state) -> bool def terminal_mixed?: () -> bool @@ -576,6 +736,7 @@ module Karya def progressed?: () -> bool def blocked?: () -> bool def dependency_blocked?: (Job job) -> bool + def child_workflow_blocked?: (Job job) -> bool end end @@ -594,6 +755,7 @@ module Karya @handler: String @arguments: Hash[String, job_argument] @depends_on: Array[String] + @child_workflow: String? @compensate_with: String? @compensation_arguments: Hash[String, job_argument] @@ -601,6 +763,7 @@ module Karya attr_reader handler: String attr_reader arguments: Hash[String, job_argument] attr_reader depends_on: Array[String] + attr_reader child_workflow: String? attr_reader compensate_with: String? attr_reader compensation_arguments: Hash[String, job_argument] @@ -610,9 +773,11 @@ module Karya ?arguments: Hash[state_name, job_argument], ?depends_on: (state_name | Array[state_name] | nil), ?compensate_with: state_name?, - ?compensation_arguments: Hash[state_name, job_argument] + ?compensation_arguments: Hash[state_name, job_argument], + ?child_workflow: state_name? ) -> void def compensable?: () -> bool + def child_workflow?: () -> bool private @@ -645,6 +810,7 @@ module Karya def arguments: () -> Hash[state_name, job_argument] def depends_on: () -> (state_name | Array[state_name] | nil) def compensate_with: () -> state_name? + def child_workflow: () -> state_name? def compensation_arguments: () -> Hash[state_name, job_argument] private @@ -665,6 +831,17 @@ module Karya def raw_dependencies: () -> Array[state_name] end + class ChildWorkflow + @value: state_name? + + def initialize: (state_name? value) -> void + def normalize: () -> String? + + private + + attr_reader value: state_name? + end + class CompensationHandler @value: state_name? @@ -682,24 +859,20 @@ module Karya class Definition @id: String @steps: Array[Step] - @step_ids: Array[String] @steps_by_id: Hash[String, Step] + @inspection: Graph::Inspection @dependencies: Array[Dependency] - @dependencies_by_step_id: Hash[String, Array[String]] - @dependents_by_step_id: Hash[String, Array[String]] - @root_step_ids: Array[String] - @leaf_step_ids: Array[String] - @compensable_step_ids: Array[String] attr_reader id: String attr_reader steps: Array[Step] - attr_reader step_ids: Array[String] attr_reader dependencies: Array[Dependency] - attr_reader root_step_ids: Array[String] - attr_reader leaf_step_ids: Array[String] - attr_reader compensable_step_ids: Array[String] def initialize: (id: state_name, steps: Array[Step]) -> void + def step_ids: () -> Array[String] + def root_step_ids: () -> Array[String] + def leaf_step_ids: () -> Array[String] + def compensable_step_ids: () -> Array[String] + def child_step_ids: () -> Array[String] def step: (state_name step_id) -> Step? def fetch_step: (state_name step_id) -> Step def dependencies_for: (state_name step_id) -> Array[String] @@ -707,8 +880,7 @@ module Karya private - attr_reader dependencies_by_step_id: Hash[String, Array[String]] - attr_reader dependents_by_step_id: Hash[String, Array[String]] + attr_reader inspection: Graph::Inspection attr_reader steps_by_id: Hash[String, Step] class Graph @@ -763,6 +935,7 @@ module Karya @root_step_ids: Array[String] @leaf_step_ids: Array[String] @compensable_step_ids: Array[String] + @child_step_ids: Array[String] attr_reader step_ids: Array[String] attr_reader dependencies_by_step_id: Hash[String, Array[String]] @@ -770,6 +943,7 @@ module Karya attr_reader root_step_ids: Array[String] attr_reader leaf_step_ids: Array[String] attr_reader compensable_step_ids: Array[String] + attr_reader child_step_ids: Array[String] def initialize: (Array[Step] steps) -> void @@ -808,6 +982,7 @@ module Karya def root_ids: () -> Array[String] def leaf_ids: (Hash[String, Array[String]] dependents_by_step_id) -> Array[String] def compensable_ids: () -> Array[String] + def child_ids: () -> Array[String] private @@ -834,6 +1009,7 @@ module Karya def root_id: () -> String? def leaf_id: (Hash[String, Array[String]] dependents_by_step_id) -> String? def compensable_id: () -> String? + def child_id: () -> String? private @@ -880,7 +1056,8 @@ module Karya ?arguments: Hash[state_name, job_argument], ?depends_on: (state_name | Array[state_name] | nil), ?compensate_with: state_name?, - ?compensation_arguments: Hash[state_name, job_argument] + ?compensation_arguments: Hash[state_name, job_argument], + ?child_workflow: state_name? ) -> nil def to_definition: () -> Definition diff --git a/core/karya/spec/karya/queue_store/bulk_mutation_report_spec.rb b/core/karya/spec/karya/queue_store/bulk_mutation_report_spec.rb index bb39ad59..bb51a557 100644 --- a/core/karya/spec/karya/queue_store/bulk_mutation_report_spec.rb +++ b/core/karya/spec/karya/queue_store/bulk_mutation_report_spec.rb @@ -67,11 +67,13 @@ def build_report(**overrides) it 'accepts workflow step control actions' do actions = %i[ + enqueue_child_workflow retry_workflow_steps dead_letter_workflow_steps replay_workflow_steps retry_dead_letter_workflow_steps discard_workflow_steps + sync_child_workflows ] expect(actions.map { |action| build_report(action:).action }).to eq(actions) diff --git a/core/karya/spec/karya/queue_store/in_memory/internal/store_state_spec.rb b/core/karya/spec/karya/queue_store/in_memory/internal/store_state_spec.rb index 661d3324..4619bde5 100644 --- a/core/karya/spec/karya/queue_store/in_memory/internal/store_state_spec.rb +++ b/core/karya/spec/karya/queue_store/in_memory/internal/store_state_spec.rb @@ -151,6 +151,49 @@ def rollback_batch_id(batch_id) expect(store_state.batches_by_id.keys).to eq(['batch-1']) end + it 'retains terminal child batches while their parent batch is still active' do + store_state.jobs_by_id['job-parent'] = active_job('job-parent') + store_state.jobs_by_id['job-child'] = succeeded_job('job-child') + store_state.register_batch(batch('parent-batch', ['job-parent'])) + store_state.register_batch(batch('child-batch', ['job-child'])) + store_state.register_workflow( + batch_id: 'parent-batch', + workflow_id: 'parent', + step_job_ids: { 'child' => 'job-parent' }, + dependency_job_ids_by_job_id: {}, + compensation_jobs_by_step_id: {}, + child_workflow_ids_by_step_id: { 'child' => 'payment' } + ) + store_state.register_workflow( + batch_id: 'child-batch', + workflow_id: 'payment', + step_job_ids: { 'authorize' => 'job-child' }, + dependency_job_ids_by_job_id: {}, + compensation_jobs_by_step_id: {} + ) + relationship = store_state.workflow_children.register( + parent_workflow_id: 'parent', + parent_batch_id: 'parent-batch', + parent_step_id: 'child', + parent_job_id: 'job-parent', + child_workflow_id: 'payment', + child_batch_id: 'child-batch' + ) + + expect(store_state.prune_terminal_batches(0)).to eq([]) + expect(store_state.batches_by_id.keys).to contain_exactly('parent-batch', 'child-batch') + expect(store_state.workflow_children.for_child_batch('child-batch')).to eq(relationship) + expect(store_state.workflow_children.expected_child_workflow_id_by_job_id).to eq('job-parent' => 'payment') + end + + it 'does not allocate empty parent relationship indexes on read paths' do + workflow_children = store_state.workflow_children + + expect(workflow_children.for_parent_step('missing-batch', 'missing-step')).to be_nil + expect(workflow_children.for_parent_batch('missing-batch')).to eq([]) + expect(workflow_children.instance_variable_get(:@by_parent_batch_id)).to eq({}) + end + it 'stores workflow registrations by batch id' do step_job_ids = { 'root' => 'job-root' } dependency_job_ids = [] @@ -182,6 +225,105 @@ def rollback_batch_id(batch_id) expect(store_state.workflow_registrations_by_batch_id['missing']).to be_nil end + it 'cleans up child workflow relationships by parent batch' do + workflow_children = store_state.workflow_children + + expect(workflow_children.delete_by_parent_batch('missing-batch')).to eq([]) + + store_state.register_workflow( + batch_id: 'batch-1', + workflow_id: 'invoice_closeout', + step_job_ids: { 'child' => 'job-child' }, + dependency_job_ids_by_job_id: {}, + compensation_jobs_by_step_id: {}, + child_workflow_ids_by_step_id: { 'child' => 'payment_flow' } + ) + relationship = workflow_children.register( + parent_workflow_id: 'invoice_closeout', + parent_batch_id: 'batch-1', + parent_step_id: 'child', + parent_job_id: 'job-child', + child_workflow_id: 'payment_flow', + child_batch_id: 'child-batch-1' + ) + + expect(workflow_children.delete_by_parent_batch('batch-1')).to eq([relationship]) + expect(workflow_children.for_parent_batch('batch-1')).to eq([]) + expect(workflow_children.for_parent_job('job-child')).to be_nil + expect(workflow_children.for_child_batch('child-batch-1')).to be_nil + expect(workflow_children.expected_child_workflow_id_by_job_id).to eq({}) + end + + it 'cleans up child workflow relationships by child batch' do + workflow_children = store_state.workflow_children + + expect(workflow_children.delete_by_child_batch('missing-child-batch')).to be_nil + + store_state.register_workflow( + batch_id: 'batch-1', + workflow_id: 'invoice_closeout', + step_job_ids: { 'child' => 'job-child', 'sibling' => 'job-sibling' }, + dependency_job_ids_by_job_id: {}, + compensation_jobs_by_step_id: {}, + child_workflow_ids_by_step_id: { 'child' => 'payment_flow', 'sibling' => 'shipment_flow' } + ) + relationship = workflow_children.register( + parent_workflow_id: 'invoice_closeout', + parent_batch_id: 'batch-1', + parent_step_id: 'child', + parent_job_id: 'job-child', + child_workflow_id: 'payment_flow', + child_batch_id: 'child-batch-1' + ) + sibling_relationship = workflow_children.register( + parent_workflow_id: 'invoice_closeout', + parent_batch_id: 'batch-1', + parent_step_id: 'sibling', + parent_job_id: 'job-sibling', + child_workflow_id: 'shipment_flow', + child_batch_id: 'child-batch-2' + ) + + expect(workflow_children.delete_by_child_batch('child-batch-1')).to eq(relationship) + expect(workflow_children.for_parent_batch('batch-1')).to eq([sibling_relationship]) + expect(workflow_children.for_parent_job('job-child')).to be_nil + expect(workflow_children.for_child_batch('child-batch-1')).to be_nil + expect(workflow_children.expected_child_workflow_id_by_job_id).to eq('job-sibling' => 'shipment_flow') + + expect(workflow_children.delete_by_child_batch('child-batch-2')).to eq(sibling_relationship) + expect(workflow_children.for_parent_batch('batch-1')).to eq([]) + expect(workflow_children.for_parent_job('job-sibling')).to be_nil + expect(workflow_children.for_child_batch('child-batch-2')).to be_nil + expect(workflow_children.expected_child_workflow_id_by_job_id).to eq({}) + end + + it 'tolerates child workflow cleanup when the parent batch index is already gone' do + workflow_children = store_state.workflow_children + + store_state.register_workflow( + batch_id: 'batch-1', + workflow_id: 'invoice_closeout', + step_job_ids: { 'child' => 'job-child' }, + dependency_job_ids_by_job_id: {}, + compensation_jobs_by_step_id: {}, + child_workflow_ids_by_step_id: { 'child' => 'payment_flow' } + ) + relationship = workflow_children.register( + parent_workflow_id: 'invoice_closeout', + parent_batch_id: 'batch-1', + parent_step_id: 'child', + parent_job_id: 'job-child', + child_workflow_id: 'payment_flow', + child_batch_id: 'child-batch-1' + ) + workflow_children.instance_variable_get(:@by_parent_batch_id).delete('batch-1') + + expect(workflow_children.delete_by_child_batch('child-batch-1')).to eq(relationship) + expect(workflow_children.for_parent_job('job-child')).to be_nil + expect(workflow_children.for_child_batch('child-batch-1')).to be_nil + expect(workflow_children.expected_child_workflow_id_by_job_id).to eq({}) + end + it 'removes workflow metadata when pruning terminal batches' do store_state.jobs_by_id['job-root'] = succeeded_job('job-root') store_state.jobs_by_id['job-child'] = succeeded_job('job-child') @@ -205,6 +347,23 @@ def rollback_batch_id(batch_id) expect(store_state.workflow_dependency_job_ids_by_job_id).to eq({}) end + it 'removes expected child metadata when pruning a workflow that never enqueued its child batch' do + store_state.jobs_by_id['job-root'] = succeeded_job('job-root') + store_state.register_batch(batch('batch-1', ['job-root'])) + store_state.register_workflow( + batch_id: 'batch-1', + workflow_id: 'invoice_closeout', + step_job_ids: { 'child' => 'job-root' }, + dependency_job_ids_by_job_id: { 'job-root' => [] }, + compensation_jobs_by_step_id: {}, + child_workflow_ids_by_step_id: { 'child' => 'payment_flow' } + ) + + expect(store_state.workflow_children.expected_child_workflow_id_by_job_id).to eq('job-root' => 'payment_flow') + expect(store_state.prune_terminal_batches(0)).to eq(['batch-1']) + expect(store_state.workflow_children.expected_child_workflow_id_by_job_id).to eq({}) + end + it 'removes workflow rollback metadata when pruning terminal batches' do store_state.jobs_by_id['job-root'] = succeeded_job('job-root') store_state.register_batch(batch('batch-1', ['job-root'])) diff --git a/core/karya/spec/karya/queue_store/in_memory/internal/workflow_child_state_spec.rb b/core/karya/spec/karya/queue_store/in_memory/internal/workflow_child_state_spec.rb new file mode 100644 index 00000000..3ada7115 --- /dev/null +++ b/core/karya/spec/karya/queue_store/in_memory/internal/workflow_child_state_spec.rb @@ -0,0 +1,132 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +RSpec.describe 'Karya::QueueStore::InMemory::Internal::WorkflowChildState' do + subject(:workflow_child_state) { described_class.new(state: store_state, now: captured_at) } + + let(:described_class) do + Karya::QueueStore::InMemory.const_get(:Internal, false).const_get(:WorkflowChildState, false) + end + let(:store_state) do + Karya::QueueStore::InMemory.const_get(:Internal, false).const_get(:StoreState, false).new(expired_tombstone_limit: 16) + end + let(:captured_at) { Time.utc(2026, 4, 25, 12, 0, 0) } + + def batch(id, job_ids) + Karya::Workflow::Batch.new(id:, job_ids:, created_at: captured_at) + end + + def job(id, state:) + Karya::Job.new(id:, queue: 'billing', handler: 'billing_sync', state:, created_at: captured_at) + end + + it 'includes nested child workflow relationship snapshots when deriving child state' do + fresh_workflow_child_state = described_class.new(state: store_state, now: captured_at) + + store_state.jobs_by_id['job-authorize'] = job('job-authorize', state: :succeeded) + store_state.jobs_by_id['job-risk_review'] = job('job-risk_review', state: :queued) + store_state.jobs_by_id['job-approve'] = job('job-approve', state: :succeeded) + store_state.register_batch(batch('payment-batch', %w[job-authorize job-risk_review])) + store_state.register_batch(batch('risk-review-batch', ['job-approve'])) + store_state.register_workflow( + batch_id: 'payment-batch', + workflow_id: 'payment', + step_job_ids: { + 'authorize' => 'job-authorize', + 'risk_review' => 'job-risk_review' + }, + dependency_job_ids_by_job_id: { + 'job-authorize' => [], + 'job-risk_review' => ['job-authorize'] + }, + compensation_jobs_by_step_id: {}, + child_workflow_ids_by_step_id: { 'risk_review' => 'risk_review' } + ) + store_state.register_workflow( + batch_id: 'risk-review-batch', + workflow_id: 'risk_review', + step_job_ids: { 'approve' => 'job-approve' }, + dependency_job_ids_by_job_id: { 'job-approve' => [] }, + compensation_jobs_by_step_id: {} + ) + store_state.workflow_children.register( + parent_workflow_id: 'parent', + parent_batch_id: 'parent-batch', + parent_step_id: 'payment_subflow', + parent_job_id: 'job-parent', + child_workflow_id: 'payment', + child_batch_id: 'payment-batch' + ) + store_state.workflow_children.register( + parent_workflow_id: 'payment', + parent_batch_id: 'payment-batch', + parent_step_id: 'risk_review', + parent_job_id: 'job-risk_review', + child_workflow_id: 'risk_review', + child_batch_id: 'risk-review-batch' + ) + + expect(fresh_workflow_child_state.resolve('payment-batch')).to eq(:running) + expect(workflow_child_state.resolve('risk-review-batch')).to eq(:succeeded) + expect(workflow_child_state.resolve('payment-batch')).to eq(:running) + end + + it 'raises a workflow execution error for child workflow cycles' do + store_state.jobs_by_id['job-a'] = job('job-a', state: :queued) + store_state.jobs_by_id['job-b'] = job('job-b', state: :queued) + store_state.register_batch(batch('batch-a', ['job-a'])) + store_state.register_batch(batch('batch-b', ['job-b'])) + store_state.register_workflow( + batch_id: 'batch-a', + workflow_id: 'workflow-a', + step_job_ids: { 'step_a' => 'job-a' }, + dependency_job_ids_by_job_id: { 'job-a' => [] }, + compensation_jobs_by_step_id: {}, + child_workflow_ids_by_step_id: { 'step_a' => 'workflow-b' } + ) + store_state.register_workflow( + batch_id: 'batch-b', + workflow_id: 'workflow-b', + step_job_ids: { 'step_b' => 'job-b' }, + dependency_job_ids_by_job_id: { 'job-b' => [] }, + compensation_jobs_by_step_id: {}, + child_workflow_ids_by_step_id: { 'step_b' => 'workflow-a' } + ) + store_state.workflow_children.register( + parent_workflow_id: 'workflow-a', + parent_batch_id: 'batch-a', + parent_step_id: 'step_a', + parent_job_id: 'job-a', + child_workflow_id: 'workflow-b', + child_batch_id: 'batch-b' + ) + store_state.workflow_children.register( + parent_workflow_id: 'workflow-b', + parent_batch_id: 'batch-b', + parent_step_id: 'step_b', + parent_job_id: 'job-b', + child_workflow_id: 'workflow-a', + child_batch_id: 'batch-a' + ) + + expect { workflow_child_state.resolve('batch-a') }.to raise_error( + Karya::Workflow::InvalidExecutionError, + 'child workflow cycle detected at batch "batch-a"' + ) + end + + it 'preserves ancestor cycle markers when a nested cycle raise is rescued' do + visiting = { 'batch-a' => true } + resolver = described_class.new(state: store_state, now: captured_at, visiting:) + + expect { resolver.resolve('batch-a') }.to raise_error( + Karya::Workflow::InvalidExecutionError, + 'child workflow cycle detected at batch "batch-a"' + ) + expect(visiting).to eq({ 'batch-a' => true }) + end +end diff --git a/core/karya/spec/karya/queue_store/in_memory/internal/workflow_support_spec.rb b/core/karya/spec/karya/queue_store/in_memory/internal/workflow_support_spec.rb index 65fa4789..dfa2607f 100644 --- a/core/karya/spec/karya/queue_store/in_memory/internal/workflow_support_spec.rb +++ b/core/karya/spec/karya/queue_store/in_memory/internal/workflow_support_spec.rb @@ -29,8 +29,8 @@ def rollback_batch_id(batch_id) root_job = job(id: 'job-2', state: :queued) store.send(:state).workflow_dependency_job_ids_by_job_id['job-2'] = [] - expect(store.send(:workflow_dependencies_satisfied?, plain_job)).to be(true) - expect(store.send(:workflow_dependencies_satisfied?, root_job)).to be(true) + expect(store.send(:workflow_dependencies_satisfied?, plain_job, now: created_at)).to be(true) + expect(store.send(:workflow_dependencies_satisfied?, root_job, now: created_at)).to be(true) end it 'requires every prerequisite job to be succeeded' do @@ -41,20 +41,20 @@ def rollback_batch_id(batch_id) store.send(:state).jobs_by_id['job-2'] = queued store.send(:state).workflow_dependency_job_ids_by_job_id['job-3'] = %w[job-1 job-2] - expect(store.send(:workflow_dependencies_satisfied?, dependent)).to be(false) + expect(store.send(:workflow_dependencies_satisfied?, dependent, now: created_at)).to be(false) store.send(:state).jobs_by_id['job-2'] = job(id: 'job-2', state: :reserved) - expect(store.send(:workflow_dependencies_satisfied?, dependent)).to be(false) + expect(store.send(:workflow_dependencies_satisfied?, dependent, now: created_at)).to be(false) store.send(:state).jobs_by_id['job-2'] = job(id: 'job-2', state: :succeeded) - expect(store.send(:workflow_dependencies_satisfied?, dependent)).to be(true) + expect(store.send(:workflow_dependencies_satisfied?, dependent, now: created_at)).to be(true) end it 'treats missing prerequisite jobs as blocked' do dependent = job(id: 'job-2', state: :queued) store.send(:state).workflow_dependency_job_ids_by_job_id['job-2'] = ['missing'] - expect(store.send(:workflow_dependencies_satisfied?, dependent)).to be(false) + expect(store.send(:workflow_dependencies_satisfied?, dependent, now: created_at)).to be(false) end it 'builds step-to-job metadata in workflow definition order' do diff --git a/core/karya/spec/karya/queue_store/in_memory_workflow_spec.rb b/core/karya/spec/karya/queue_store/in_memory_workflow_spec.rb index f94ef50d..5d128376 100644 --- a/core/karya/spec/karya/queue_store/in_memory_workflow_spec.rb +++ b/core/karya/spec/karya/queue_store/in_memory_workflow_spec.rb @@ -1033,4 +1033,240 @@ def run_successfully(reservation, start_offset:, complete_offset:) expect(reserve(3)).to be_nil end end + + describe 'child workflow orchestration' do + def parent_definition + Karya::Workflow.define(:parent) do + step :prepare, handler: :prepare + step :payment_subflow, handler: :payment_subflow, depends_on: :prepare, child_workflow: :payment + step :receipt, handler: :receipt, depends_on: :payment_subflow + end + end + + def child_definition + Karya::Workflow.define(:payment) do + step :authorize, handler: :authorize + step :capture, handler: :capture, depends_on: :authorize + end + end + + def enqueue_parent + store.enqueue_workflow( + definition: parent_definition, + jobs_by_step_id: { + prepare: workflow_job(:prepare), + payment_subflow: workflow_job(:payment_subflow), + receipt: workflow_job(:receipt) + }, + batch_id: :parent_batch, + now: created_at + 1 + ) + end + + def enqueue_child + store.enqueue_child_workflow( + parent_batch_id: :parent_batch, + parent_step_id: :payment_subflow, + definition: child_definition, + jobs_by_step_id: { + authorize: workflow_job(:authorize), + capture: workflow_job(:capture) + }, + batch_id: :payment_batch, + now: created_at + 5 + ) + end + + it 'keeps child workflow batches separate and gates the parent child step until child success' do + enqueue_parent + prepare = reserve(2) + run_successfully(prepare, start_offset: 3, complete_offset: 4) + + expect(reserve(5, handler_names: ['payment_subflow'])).to be_nil + report = enqueue_child + expect(report.action).to eq(:enqueue_child_workflow) + expect(store.batch_snapshot(batch_id: :parent_batch, now: created_at + 6).job_ids).to eq( + %w[job-prepare job-payment_subflow job-receipt] + ) + expect(store.batch_snapshot(batch_id: :payment_batch, now: created_at + 6).job_ids).to eq(%w[job-authorize job-capture]) + + authorize = reserve(7) + expect(authorize.job_id).to eq('job-authorize') + run_successfully(authorize, start_offset: 8, complete_offset: 9) + capture = reserve(10) + expect(capture.job_id).to eq('job-capture') + run_successfully(capture, start_offset: 11, complete_offset: 12) + + parent_snapshot = store.workflow_snapshot(batch_id: :parent_batch, now: created_at + 13) + expect(parent_snapshot.fetch_step(:payment_subflow)).to be_ready + expect(parent_snapshot.child_workflow(:payment_subflow)).to have_attributes( + child_batch_id: 'payment_batch', + child_state: :succeeded + ) + expect(store.workflow_snapshot(batch_id: :payment_batch, now: created_at + 13).parent).to have_attributes( + parent_batch_id: 'parent_batch', + parent_step_id: 'payment_subflow' + ) + + parent_gate = reserve(14, handler_names: ['payment_subflow']) + expect(parent_gate.job_id).to eq('job-payment_subflow') + run_successfully(parent_gate, start_offset: 15, complete_offset: 16) + expect(reserve(17, handler_names: ['receipt']).job_id).to eq('job-receipt') + end + + it 'syncs failed child workflows into failed parent child steps' do + enqueue_parent + run_successfully(reserve(2), start_offset: 3, complete_offset: 4) + enqueue_child + run_successfully(reserve(6), start_offset: 7, complete_offset: 8) + capture = reserve(9) + store.start_execution(reservation_token: capture.token, now: created_at + 10) + store.fail_execution(reservation_token: capture.token, now: created_at + 11, failure_classification: :error) + + report = store.sync_child_workflows(parent_batch_id: :parent_batch, now: created_at + 12) + + expect(report.action).to eq(:sync_child_workflows) + expect(report.changed_jobs).to contain_exactly( + have_attributes(id: 'job-payment_subflow', state: :dead_letter, dead_letter_reason: 'child workflow payment_batch failed') + ) + expect(store.workflow_snapshot(batch_id: :parent_batch, now: created_at + 13)).to have_attributes(state: :failed) + end + + it 'syncs cancelled child workflows into cancelled parent child steps' do + enqueue_parent + run_successfully(reserve(2), start_offset: 3, complete_offset: 4) + enqueue_child + store.cancel_jobs(job_ids: %w[job-authorize job-capture], now: created_at + 6) + + report = store.sync_child_workflows(parent_batch_id: :parent_batch, now: created_at + 7) + + expect(report.changed_jobs).to contain_exactly(have_attributes(id: 'job-payment_subflow', state: :cancelled)) + expect(store.workflow_snapshot(batch_id: :parent_batch, now: created_at + 8).fetch_step(:payment_subflow)).to be_terminal + end + + it 'skips sync while child workflows remain nonterminal' do + enqueue_parent + run_successfully(reserve(2), start_offset: 3, complete_offset: 4) + enqueue_child + + report = store.sync_child_workflows(parent_batch_id: :parent_batch, now: created_at + 6) + + expect(report.changed_jobs).to eq([]) + expect(report.skipped_jobs).to contain_exactly( + include(job_id: 'job-payment_subflow', reason: :ineligible_state, state: :queued) + ) + end + + it 'exposes nested child workflow declarations on child snapshots' do + nested_child = Karya::Workflow.define(:payment) do + step :authorize, handler: :authorize + step :risk_review, handler: :risk_review, child_workflow: :risk_review + end + enqueue_parent + + store.enqueue_child_workflow( + parent_batch_id: :parent_batch, + parent_step_id: :payment_subflow, + definition: nested_child, + jobs_by_step_id: { + authorize: workflow_job(:authorize), + risk_review: workflow_job(:risk_review) + }, + batch_id: :payment_batch, + now: created_at + 2 + ) + + expect(store.workflow_snapshot(batch_id: :payment_batch, now: created_at + 3).fetch_step(:risk_review)).to have_attributes( + child_workflow_id: 'risk_review', + child_workflow: nil + ) + end + + it 'rejects invalid child workflow enqueue without partial writes' do + enqueue_parent + wrong_child = Karya::Workflow.define(:shipment) { step :book, handler: :book } + + expect do + store.enqueue_child_workflow( + parent_batch_id: :parent_batch, + parent_step_id: :payment_subflow, + definition: wrong_child, + jobs_by_step_id: { book: workflow_job(:book) }, + batch_id: :payment_batch, + now: created_at + 2 + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child workflow "shipment" does not match parent step "payment_subflow"') + + expect do + store.workflow_snapshot(batch_id: :payment_batch, now: created_at + 3) + end.to raise_error(Karya::Workflow::UnknownBatchError) + expect(store.workflow_snapshot(batch_id: :parent_batch, now: created_at + 3).child_workflows).to eq([]) + end + + it 'rejects invalid child workflow boundaries without partial writes' do + enqueue_parent + + expect do + store.enqueue_child_workflow( + parent_batch_id: :parent_batch, + parent_step_id: :prepare, + definition: child_definition, + jobs_by_step_id: { authorize: workflow_job(:authorize), capture: workflow_job(:capture) }, + batch_id: :payment_batch, + now: created_at + 2 + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'workflow step "prepare" is not a child workflow step') + + expect do + store.enqueue_child_workflow( + parent_batch_id: :parent_batch, + parent_step_id: :payment_subflow, + definition: 'payment', + jobs_by_step_id: { authorize: workflow_job(:authorize), capture: workflow_job(:capture) }, + batch_id: :payment_batch, + now: created_at + 3 + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'definition must be a Karya::Workflow::Definition') + + expect do + store.enqueue_child_workflow( + parent_batch_id: :parent_batch, + parent_step_id: :payment_subflow, + definition: child_definition, + jobs_by_step_id: { authorize: workflow_job(:authorize), capture: workflow_job(:capture) }, + batch_id: :parent_batch, + now: created_at + 4 + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child workflow batch id must differ from parent batch id') + + store.cancel_jobs(job_ids: ['job-payment_subflow'], now: created_at + 5) + expect do + store.enqueue_child_workflow( + parent_batch_id: :parent_batch, + parent_step_id: :payment_subflow, + definition: child_definition, + jobs_by_step_id: { authorize: workflow_job(:authorize), capture: workflow_job(:capture) }, + batch_id: :payment_batch, + now: created_at + 6 + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'parent child workflow step "payment_subflow" must be queued') + expect(store.workflow_snapshot(batch_id: :parent_batch, now: created_at + 7).child_workflows).to eq([]) + end + + it 'rejects duplicate child workflow registrations' do + enqueue_parent + enqueue_child + + expect do + store.enqueue_child_workflow( + parent_batch_id: :parent_batch, + parent_step_id: :payment_subflow, + definition: child_definition, + jobs_by_step_id: { authorize: workflow_job(:authorize), capture: workflow_job(:capture) }, + batch_id: :payment_batch_two, + now: created_at + 6 + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child workflow already registered for step "payment_subflow"') + end + end end diff --git a/core/karya/spec/karya/queue_store_base_spec.rb b/core/karya/spec/karya/queue_store_base_spec.rb index 8f998542..a5535850 100644 --- a/core/karya/spec/karya/queue_store_base_spec.rb +++ b/core/karya/spec/karya/queue_store_base_spec.rb @@ -49,6 +49,25 @@ end.to raise_error(NotImplementedError, /implement #workflow_snapshot/) end + it 'requires enqueue_child_workflow to be implemented' do + expect do + store.enqueue_child_workflow( + parent_batch_id: 'parent-batch', + parent_step_id: 'child-step', + definition: instance_double(Karya::Workflow::Definition), + jobs_by_step_id: {}, + batch_id: 'child-batch', + now: Time.utc(2026, 3, 27, 12, 0, 0) + ) + end.to raise_error(NotImplementedError, /implement #enqueue_child_workflow/) + end + + it 'requires sync_child_workflows to be implemented' do + expect do + store.sync_child_workflows(parent_batch_id: 'parent-batch', now: Time.utc(2026, 3, 27, 12, 0, 0)) + end.to raise_error(NotImplementedError, /implement #sync_child_workflows/) + end + it 'requires rollback_workflow to be implemented' do expect do store.rollback_workflow(batch_id: 'batch-1', now: Time.utc(2026, 3, 27, 12, 0, 0), reason: 'operator rollback') diff --git a/core/karya/spec/karya/workflow/child_workflow_snapshot_spec.rb b/core/karya/spec/karya/workflow/child_workflow_snapshot_spec.rb new file mode 100644 index 00000000..da9652b8 --- /dev/null +++ b/core/karya/spec/karya/workflow/child_workflow_snapshot_spec.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +# Copyright Codevedas Inc. 2025-present +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +RSpec.describe Karya::Workflow::ChildWorkflowSnapshot do + let(:snapshot) do + described_class.new( + parent_workflow_id: ' parent ', + parent_batch_id: ' parent-batch ', + parent_step_id: ' child-step ', + parent_job_id: ' parent-job ', + child_workflow_id: ' child ', + child_batch_id: ' child-batch ', + child_state: :running + ) + end + + it 'normalizes ids and freezes the snapshot' do + expect(snapshot).to have_attributes( + parent_workflow_id: 'parent', + parent_batch_id: 'parent-batch', + parent_step_id: 'child-step', + parent_job_id: 'parent-job', + child_workflow_id: 'child', + child_batch_id: 'child-batch', + child_state: :running + ) + expect(snapshot).to be_frozen + end + + it 'rejects invalid workflow states' do + expect do + described_class.new( + parent_workflow_id: :parent, + parent_batch_id: :parent_batch, + parent_step_id: :child_step, + parent_job_id: :parent_job, + child_workflow_id: :child, + child_batch_id: :child_batch, + child_state: :unknown + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child_state must be a workflow state') + end + + it 'rejects unknown attributes' do + expect do + described_class.new( + parent_workflow_id: :parent, + parent_batch_id: :parent_batch, + parent_step_id: :child_step, + parent_job_id: :parent_job, + child_workflow_id: :child, + child_batch_id: :child_batch, + child_state: :running, + unexpected: true + ) + end.to raise_error(ArgumentError, 'unknown keyword: :unexpected') + end +end diff --git a/core/karya/spec/karya/workflow/definition_spec.rb b/core/karya/spec/karya/workflow/definition_spec.rb index 13419d83..e397f0f5 100644 --- a/core/karya/spec/karya/workflow/definition_spec.rb +++ b/core/karya/spec/karya/workflow/definition_spec.rb @@ -16,7 +16,8 @@ id: :emit_receipt, handler: :emit_receipt, depends_on: :capture_payment, - compensate_with: :void_receipt + compensate_with: :void_receipt, + child_workflow: :receipt_subflow ) definition = described_class.new(id: :invoice_closeout, steps: [calculate_totals, capture_payment, emit_receipt]) @@ -35,6 +36,7 @@ expect(definition.root_step_ids).to eq(['calculate_totals']) expect(definition.leaf_step_ids).to eq(['emit_receipt']) expect(definition.compensable_step_ids).to eq(['emit_receipt']) + expect(definition.child_step_ids).to eq(['emit_receipt']) expect(definition).to be_frozen end diff --git a/core/karya/spec/karya/workflow/snapshot_spec.rb b/core/karya/spec/karya/workflow/snapshot_spec.rb index 33177097..29dae6e1 100644 --- a/core/karya/spec/karya/workflow/snapshot_spec.rb +++ b/core/karya/spec/karya/workflow/snapshot_spec.rb @@ -28,7 +28,19 @@ def rollback ) end - def snapshot(jobs:, step_job_ids: nil, dependencies: {}, rollback: nil) + def child_workflow(state: :running) + Karya::Workflow::ChildWorkflowSnapshot.new( + parent_workflow_id: :invoice_closeout, + parent_batch_id: 'batch_1', + parent_step_id: :child, + parent_job_id: :job_child, + child_workflow_id: :payment, + child_batch_id: :payment_batch, + child_state: state + ) + end + + def snapshot(jobs:, step_job_ids: nil, dependencies: {}, rollback: nil, child_workflows: [], child_workflow_ids_by_step_id: {}, parent: nil) described_class.new( workflow_id: ' invoice_closeout ', batch_id: ' batch_1 ', @@ -36,6 +48,9 @@ def snapshot(jobs:, step_job_ids: nil, dependencies: {}, rollback: nil) step_job_ids: step_job_ids || jobs.to_h { |workflow_job| [workflow_job.id.delete_prefix('job_'), workflow_job.id] }, dependency_job_ids_by_job_id: dependencies, jobs:, + child_workflow_ids_by_step_id:, + child_workflows:, + parent:, rollback: ) end @@ -89,6 +104,119 @@ def snapshot(jobs:, step_job_ids: nil, dependencies: {}, rollback: nil) ) end + it 'exposes parent and child workflow relationship metadata' do + jobs = [job(id: 'job_root', state: :succeeded), job(id: 'job_child', state: :queued)] + relationship = child_workflow(state: :succeeded) + + result = snapshot( + jobs:, + step_job_ids: { root: 'job_root', child: 'job_child' }, + dependencies: { 'job_child' => ['job_root'] }, + child_workflow_ids_by_step_id: { child: :payment }, + child_workflows: [relationship] + ) + + expect(result.child_workflows).to eq([relationship]) + expect(result.child_workflow(:child)).to eq(relationship) + expect(result.fetch_child_workflow(' child ')).to eq(relationship) + expect(result.parent).to be_nil + expect(result.fetch_step(:child)).to have_attributes( + child_workflow_id: 'payment', + child_workflow: relationship + ) + expect(result.fetch_step(:child)).to be_ready + end + + it 'treats waiting child workflow steps as blocked until the child succeeds' do + jobs = [job(id: 'job_child', state: :queued)] + + missing_child = snapshot( + jobs:, + step_job_ids: { child: 'job_child' }, + child_workflow_ids_by_step_id: { child: :payment } + ) + running_child = snapshot( + jobs:, + step_job_ids: { child: 'job_child' }, + child_workflow_ids_by_step_id: { child: :payment }, + child_workflows: [child_workflow(state: :running)] + ) + succeeded_child = snapshot( + jobs:, + step_job_ids: { child: 'job_child' }, + child_workflow_ids_by_step_id: { child: :payment }, + child_workflows: [child_workflow(state: :succeeded)] + ) + + expect(missing_child.state).to eq(:blocked) + expect(running_child.state).to eq(:blocked) + expect(succeeded_child.state).to eq(:pending) + end + + it 'exposes parent workflow metadata for child batch snapshots' do + jobs = [job(id: 'job_authorize', state: :queued)] + relationship = Karya::Workflow::ChildWorkflowSnapshot.new( + parent_workflow_id: :invoice_closeout, + parent_batch_id: 'parent_batch', + parent_step_id: :child, + parent_job_id: :job_child, + child_workflow_id: :payment, + child_batch_id: 'batch_1', + child_state: :running + ) + + result = described_class.new( + workflow_id: 'payment', + batch_id: 'batch_1', + captured_at:, + step_job_ids: { authorize: 'job_authorize' }, + dependency_job_ids_by_job_id: {}, + jobs:, + parent: relationship + ) + + expect(result.parent).to eq(relationship) + end + + it 'validates parent and child workflow relationship metadata' do + jobs = [job(id: 'job_child', state: :queued)] + relationship = child_workflow(state: :running) + + expect do + snapshot(jobs:, step_job_ids: { child: 'job_child' }, parent: 'parent') + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'parent must be Karya::Workflow::ChildWorkflowSnapshot') + expect do + snapshot(jobs:, step_job_ids: { child: 'job_child' }, child_workflow_ids_by_step_id: 'child') + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child_workflow_ids_by_step_id must be a Hash') + expect do + snapshot(jobs:, step_job_ids: { child: 'job_child' }, child_workflows: 'child') + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child_workflows must be an Array') + expect do + snapshot(jobs:, step_job_ids: { child: 'job_child' }, child_workflows: ['child']) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child_workflows entries must be Karya::Workflow::ChildWorkflowSnapshot') + expect do + snapshot(jobs:, step_job_ids: { child: 'job_child' }, child_workflows: [relationship]) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'unknown child workflow step "child"') + + mismatched_relationship = child_workflow(state: :running) + expect do + snapshot( + jobs:, + step_job_ids: { child: 'job_child' }, + child_workflow_ids_by_step_id: { child: :shipment }, + child_workflows: [mismatched_relationship] + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child workflow relationship id must match declared child workflow id') + expect do + snapshot( + jobs:, + step_job_ids: { child: 'job_child' }, + child_workflow_ids_by_step_id: { child: :payment }, + child_workflows: [relationship, relationship] + ) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'duplicate child workflow for step "child"') + end + it 'raises execution errors for unknown runtime step lookup' do result = snapshot(jobs: [job(id: 'job_root', state: :queued)]) @@ -96,6 +224,10 @@ def snapshot(jobs:, step_job_ids: nil, dependencies: {}, rollback: nil) expect do result.fetch_step(:missing) end.to raise_error(Karya::Workflow::InvalidExecutionError, 'unknown workflow step "missing"') + expect(result.child_workflow(:missing)).to be_nil + expect do + result.fetch_child_workflow(:missing) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'unknown child workflow for step "missing"') end it 'rejects invalid identifiers and timestamps' do diff --git a/core/karya/spec/karya/workflow/step_snapshot_spec.rb b/core/karya/spec/karya/workflow/step_snapshot_spec.rb index c5491af6..9f8c84c9 100644 --- a/core/karya/spec/karya/workflow/step_snapshot_spec.rb +++ b/core/karya/spec/karya/workflow/step_snapshot_spec.rb @@ -30,6 +30,18 @@ def snapshot(state: :queued, prerequisite_states: { 'job-root' => :succeeded }) ) end + def child_workflow(state) + Karya::Workflow::ChildWorkflowSnapshot.new( + parent_workflow_id: :invoice_closeout, + parent_batch_id: 'batch_1', + parent_step_id: :child, + parent_job_id: :'job-child', + child_workflow_id: :payment, + child_batch_id: :payment_batch, + child_state: state + ) + end + it 'builds immutable per-step inspection data' do result = snapshot @@ -58,6 +70,75 @@ def snapshot(state: :queued, prerequisite_states: { 'job-root' => :succeeded }) expect(snapshot(state: :succeeded)).to be_terminal end + it 'blocks child workflow steps until the child workflow succeeds' do + missing_child = described_class.new( + workflow_id: :invoice_closeout, + batch_id: 'batch_1', + step_id: :child, + job_id: :'job-child', + job: job, + prerequisite_job_ids: [], + prerequisite_states: {}, + child_workflow_id: :payment + ) + running_child = described_class.new( + workflow_id: :invoice_closeout, + batch_id: 'batch_1', + step_id: :child, + job_id: :'job-child', + job: job, + prerequisite_job_ids: [], + prerequisite_states: {}, + child_workflow_id: :payment, + child_workflow: child_workflow(:running) + ) + succeeded_child = described_class.new( + workflow_id: :invoice_closeout, + batch_id: 'batch_1', + step_id: :child, + job_id: :'job-child', + job: job, + prerequisite_job_ids: [], + prerequisite_states: {}, + child_workflow_id: :payment, + child_workflow: child_workflow(:succeeded) + ) + + expect(missing_child).to be_child_workflow + expect(missing_child).to be_blocked + expect(running_child).to be_blocked + expect(succeeded_child).to be_ready + end + + it 'validates child workflow relationship metadata' do + common_attributes = { + workflow_id: :invoice_closeout, + batch_id: 'batch_1', + step_id: :child, + job_id: :'job-child', + job: job, + prerequisite_job_ids: [], + prerequisite_states: {}, + child_workflow_id: :payment + } + + expect do + described_class.new(**common_attributes, child_workflow: 'payment') + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child_workflow must be Karya::Workflow::ChildWorkflowSnapshot') + expect do + described_class.new(**common_attributes, child_workflow_id: :shipment, child_workflow: child_workflow(:running)) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child_workflow_id must match child workflow relationship') + expect do + described_class.new(**common_attributes, batch_id: :other_batch, child_workflow: child_workflow(:running)) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child workflow parent batch must match step batch') + expect do + described_class.new(**common_attributes, step_id: :other_step, child_workflow: child_workflow(:running)) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child workflow parent step must match step id') + expect do + described_class.new(**common_attributes, job_id: :other_job, job: job(id: 'other_job'), child_workflow: child_workflow(:running)) + end.to raise_error(Karya::Workflow::InvalidExecutionError, 'child workflow parent job must match step job') + end + it 'treats custom nonterminal lifecycle states as active' do Karya::JobLifecycle.register_state(:awaiting_review) diff --git a/core/karya/spec/karya/workflow/step_spec.rb b/core/karya/spec/karya/workflow/step_spec.rb index 75d9bd81..6562d3d9 100644 --- a/core/karya/spec/karya/workflow/step_spec.rb +++ b/core/karya/spec/karya/workflow/step_spec.rb @@ -38,12 +38,21 @@ expect(step.compensation_arguments).to be_frozen end + it 'normalizes optional child workflow metadata' do + step = described_class.new(id: :capture_payment, handler: :capture_payment, child_workflow: ' payment_subflow ') + + expect(step.child_workflow?).to be(true) + expect(step.child_workflow).to eq('payment_subflow') + end + it 'defaults compensation metadata to no-op rollback behavior' do step = described_class.new(id: :capture_payment, handler: :capture_payment) expect(step.compensable?).to be(false) expect(step.compensate_with).to be_nil expect(step.compensation_arguments).to eq({}) + expect(step.child_workflow?).to be(false) + expect(step.child_workflow).to be_nil end it 'rejects invalid compensation arguments' do diff --git a/docs/pages/runtime/controls.md b/docs/pages/runtime/controls.md index c58c1095..55a881a0 100644 --- a/docs/pages/runtime/controls.md +++ b/docs/pages/runtime/controls.md @@ -85,6 +85,10 @@ operator search and governance layers choose targets: replay, retry-dead-letter, and discard only act when the primary step job is currently `:dead_letter` and otherwise skip the targeted step as ineligible; they do not add selectors, automatic rollback, or event-history replay +- child workflow controls enqueue child batches for declared parent child steps + and synchronize terminal child state back to the parent gate job; they do not + automatically enqueue children from workers or cascade rollback across parent + and child workflows - bulk retry returns failed or `retry_pending` jobs to normal queued execution when they are still eligible and uniqueness-safe - bulk cancellation can stop queued, retry-pending, reserved, or running jobs; diff --git a/docs/pages/troubleshooting.md b/docs/pages/troubleshooting.md index d08463a3..122d5813 100644 --- a/docs/pages/troubleshooting.md +++ b/docs/pages/troubleshooting.md @@ -168,6 +168,21 @@ For dead-lettered work, `replay_workflow_steps` returns the step to `queued`; `retry_dead_letter_workflow_steps` returns it to `retry_pending` until the configured retry time. Dependents unblock only after the parent succeeds. +### Child Workflow Step Does Not Reserve + +Child workflow parent steps are gate jobs. They wait for the child workflow to +succeed before workers can reserve the parent-side step: + +```text +symptom: parent child step stays queued +first checks: workflow_snapshot.fetch_step(:payment).child_workflow_id, workflow_snapshot.fetch_step(:payment).child_workflow +next move: if child_workflow is nil, enqueue or register the declared child workflow; otherwise inspect and recover it by child_batch_id +``` + +If the child workflow is failed or cancelled, run `sync_child_workflows` against +the parent batch to propagate that terminal state to the parent gate job. Sync +does not roll back either workflow automatically. + ### Rollback Has No Batch To Inspect No-op rollback is valid when every succeeded primary step is uncompensated: diff --git a/docs/pages/workflows/child-workflows.md b/docs/pages/workflows/child-workflows.md index f1b3d829..4318f6fe 100644 --- a/docs/pages/workflows/child-workflows.md +++ b/docs/pages/workflows/child-workflows.md @@ -7,25 +7,63 @@ permalink: /workflows/child-workflows/ # Child Workflows -Child workflows and subflow orchestration are documented as explicit workflow -primitives. +Child workflows and subflow orchestration are explicit relationships between +workflow batches. A parent step can declare that it is backed by a child +workflow, and the child run is enqueued as its own immutable workflow batch. ## Covered Behavior - parent-child lifecycle relationships - success, failure, cancellation, and recovery behavior - operator visibility across related executions +- explicit sync boundaries instead of hidden background propagation ## Operational Expectations Operators need to inspect workflow hierarchies clearly rather than treating -subflows as opaque implementation detail. +subflows as opaque implementation detail. Child workflow batches remain normal +workflow batches: they can be inspected, retried, replayed, rolled back, and +recovered by their own batch id. + +Parent-child propagation is explicit. Karya does not automatically enqueue a +child workflow from worker execution, and it does not automatically cascade +rollback between parent and child workflows. ## Common Scenarios +### Declaring A Child Step + +```ruby +parent = Karya::Workflow.define(:order_fulfillment) do + step :validate_order, handler: :validate_order + step :payment, handler: :payment_gate, depends_on: :validate_order, child_workflow: :payment_authorization + step :ship_order, handler: :ship_order, depends_on: :payment +end +``` + +The `payment` step still binds to one concrete parent job. That job acts as the +parent-side gate for downstream dependencies, and it is not reservable until +the child workflow succeeds. + +### Enqueuing The Child Workflow + +```ruby +store.enqueue_child_workflow( + parent_batch_id: :order_88, + parent_step_id: :payment, + definition: payment_authorization, + jobs_by_step_id: payment_jobs, + batch_id: :payment_authorization_88, + now: Time.now +) +``` + +The child workflow batch is separate from the parent batch. Parent membership +does not grow when a child is enqueued. + ### Inspecting A Workflow Hierarchy -Child workflows should surface parent-child relationships directly: +Child workflows surface parent-child relationships directly: ```text parent_workflow: order-fulfillment-88 @@ -37,6 +75,29 @@ status: waiting-on-children Subflows remain visible execution units with explicit relationships. +```ruby +snapshot = store.workflow_snapshot(batch_id: :order_88, now: Time.now) + +snapshot.fetch_step(:payment).child_workflow.child_batch_id +#=> "payment_authorization_88" + +store.workflow_snapshot(batch_id: :payment_authorization_88, now: Time.now).parent.parent_batch_id +#=> "order_88" +``` + +### Synchronizing Lifecycle State + +When a child workflow succeeds, the parent gate step becomes reservable and the +worker completes that gate job normally. When a child workflow fails or is +cancelled, operators synchronize the relationship explicitly: + +```ruby +store.sync_child_workflows(parent_batch_id: :order_88, now: Time.now) +``` + +Sync propagates terminal child failure or cancellation to the parent child-step +job. It does not roll back parent or child workflows automatically. + ## Related Concepts - [Workflow Basics](/workflows/basics/): child workflows extend the orchestration model diff --git a/docs/pages/workflows/replay.md b/docs/pages/workflows/replay.md index b483bfaf..a1c749bb 100644 --- a/docs/pages/workflows/replay.md +++ b/docs/pages/workflows/replay.md @@ -114,6 +114,23 @@ Rollback compensates succeeded compensable primary steps in reverse workflow definition order. If no succeeded step has compensation, Karya records the rollback request boundary without creating a physical rollback batch. +### Child Workflow Recovery + +Child workflows recover by their own workflow batch id. Recovering a child does +not mutate the parent automatically: + +```text +parent_workflow_batch_id: order-88 +child_workflow_batch_id: payment-authorization-88 +selected_action: replay_workflow_steps +step_ids: authorize_payment +expected_result: child step queues again; parent gate stays blocked until the child succeeds +``` + +After a child succeeds, the parent child-step job becomes reservable. If the +child fails or is cancelled, use `sync_child_workflows` against the parent batch +to propagate that terminal state to the parent gate job. + ## Related Concepts - [Signals](/workflows/signals/): interactive workflows need recovery and live control