Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions core/karya/lib/karya/queue_store/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,33 @@ def workflow_snapshot(batch_id:, now:)
raise NotImplementedError, "#{self.class} must implement ##{__method__}"
end

# Enqueue a child workflow batch for one declared parent child step.
def enqueue_child_workflow(
parent_batch_id:,
parent_step_id:,
definition:,
jobs_by_step_id:,
batch_id:,
now:,
compensation_jobs_by_step_id: {}
)
_parent_batch_id = parent_batch_id
_parent_step_id = parent_step_id
_definition = definition
_jobs_by_step_id = jobs_by_step_id
_batch_id = batch_id
_now = now
_compensation_jobs_by_step_id = compensation_jobs_by_step_id
raise NotImplementedError, "#{self.class} must implement ##{__method__}"
end

# Synchronize terminal child workflow state into parent child-step jobs.
def sync_child_workflows(parent_batch_id:, now:)
_parent_batch_id = parent_batch_id
_now = now
raise NotImplementedError, "#{self.class} must implement ##{__method__}"
end

# Trigger explicit saga rollback for one failed workflow batch.
def rollback_workflow(batch_id:, now:, reason:)
_batch_id = batch_id
Expand Down
6 changes: 4 additions & 2 deletions core/karya/lib/karya/queue_store/bulk_mutation_report.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ class BulkMutationReport
replay_dead_letter_jobs
retry_dead_letter_jobs
discard_dead_letter_jobs
enqueue_child_workflow
rollback_workflow
sync_child_workflows
retry_workflow_steps
dead_letter_workflow_steps
replay_workflow_steps
Expand Down Expand Up @@ -128,9 +130,9 @@ def skipped_reason_error_message

def action_error_message
'action must be one of :enqueue_many, :retry_jobs, :cancel_jobs, :dead_letter_jobs, ' \
':replay_dead_letter_jobs, :retry_dead_letter_jobs, :discard_dead_letter_jobs, :rollback_workflow, ' \
':replay_dead_letter_jobs, :retry_dead_letter_jobs, :discard_dead_letter_jobs, :enqueue_child_workflow, :rollback_workflow, ' \
':retry_workflow_steps, :dead_letter_workflow_steps, :replay_workflow_steps, ' \
':retry_dead_letter_workflow_steps, or :discard_workflow_steps'
':retry_dead_letter_workflow_steps, :discard_workflow_steps, or :sync_child_workflows'
end

private_constant :ACTIONS, :JobIdList, :JobList, :SKIPPED_JOB_REASONS
Expand Down
11 changes: 5 additions & 6 deletions core/karya/lib/karya/queue_store/in_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ module QueueStore
# Single-process reference implementation for queue submission and reservation behavior.
#
# InMemory is intentionally ephemeral and suitable for development, tests,
# examples, and as the executable reference for `QueueStore::Base`
# semantics. It is not a durable backend: jobs, queue indexes, reservations,
# examples, and as the executable reference for `QueueStore::Base` semantics.
# It is not a durable backend: jobs, queue indexes, reservations,
# active executions, retry state, and expired-token tombstones live only in
# process memory and are lost on restart. Production deployments that need
# durable enqueue acknowledgment or restart/takeover recovery must use a
# shared persistent backend implementing the Base durability contract.
class InMemory
# Owner-local implementation helpers for the executable reference store.
module Internal
end
module Internal; end

include Base
include Internal::BatchSupport
include Internal::BackpressureSupport
include Internal::BackpressureSnapshotSupport
include Internal::ChildWorkflowSupport
include Internal::DeadLetterSupport
include Internal::ExecutionSupport
include Internal::ExpirationSupport
Expand All @@ -60,8 +60,7 @@ module Internal
include Internal::UniquenessSupport
include Internal::WorkflowSupport

DEFAULT_EXPIRED_TOMBSTONE_LIMIT = 1024
DEFAULT_COMPLETED_BATCH_RETENTION_LIMIT = 1024
DEFAULT_EXPIRED_TOMBSTONE_LIMIT = DEFAULT_COMPLETED_BATCH_RETENTION_LIMIT = 1024
DEFAULT_MAX_BATCH_SIZE = 1000
RESERVE_QUEUES_ERROR_MESSAGE = 'provide exactly one of queue or queues'

Expand Down
3 changes: 3 additions & 0 deletions core/karya/lib/karya/queue_store/in_memory/internal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
require_relative 'internal/backpressure_support'
require_relative 'internal/backpressure_snapshot_support'
require_relative 'internal/batch_support'
require_relative 'internal/child_workflow_support'
require_relative 'internal/workflow_child_state'
require_relative 'internal/dead_letter_support'
require_relative 'internal/execution_recovery'
require_relative 'internal/execution_support'
Expand All @@ -24,4 +26,5 @@
require_relative 'internal/retry_support'
require_relative 'internal/store_state'
require_relative 'internal/uniqueness_support'
require_relative 'internal/workflow_child_ids'
require_relative 'internal/workflow_support'
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
# frozen_string_literal: true

# Copyright Codevedas Inc. 2025-present
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

module Karya
module QueueStore
class InMemory
module Internal
# Owner-local child workflow enqueue and lifecycle sync support.
module ChildWorkflowSupport
def enqueue_child_workflow(
parent_batch_id:,
parent_step_id:,
definition:,
jobs_by_step_id:,
batch_id:,
now:,
compensation_jobs_by_step_id: {}
)
request = ChildWorkflowRequest.new(
parent_batch_id: Workflow.send(:normalize_batch_identifier, :parent_batch_id, parent_batch_id),
parent_step_id: Workflow.send(:normalize_execution_identifier, :parent_step_id, parent_step_id),
now: normalize_time(:now, now, error_class: Workflow::InvalidExecutionError)
)

@mutex.synchronize do
parent_batch_id = request.parent_batch_id
parent_step_id = request.parent_step_id
parent = prepare_child_workflow_parent(parent_batch_id:, parent_step_id:, definition:)
binding = Workflow.send(:build_compensated_execution_binding, definition:, jobs_by_step_id:, batch_id:, compensation_jobs_by_step_id:)
validate_child_batch_identity(parent_batch_id:, child_batch_id: binding.batch_id)
enqueue_child_workflow_binding(parent:, parent_step_id:, binding:, definition:, now: request.now)
end
end

def sync_child_workflows(parent_batch_id:, now:)
request = ChildWorkflowSyncRequest.new(
parent_batch_id: Workflow.send(:normalize_batch_identifier, :parent_batch_id, parent_batch_id),
now: normalize_time(:now, now, error_class: Workflow::InvalidExecutionError)
)

@mutex.synchronize do
parent_batch_id = request.parent_batch_id
now = request.now
fetch_workflow_registration(fetch_batch(parent_batch_id).id)
relationships = child_relationships_for_parent_batch(parent_batch_id)
Karya::Internal::BulkMutation::ReportBuilder.new(
action: :sync_child_workflows,
job_ids: relationships.map(&:parent_job_id),
now:
).to_report do |job_id, changed_jobs, skipped_jobs|
sync_child_workflow_job(job_id:, now:, changed_jobs:, skipped_jobs:)
end
end
end

private

# Normalized child workflow enqueue request.
class ChildWorkflowRequest
attr_reader :now, :parent_batch_id, :parent_step_id

def initialize(parent_batch_id:, parent_step_id:, now:)
@now = now
@parent_batch_id = parent_batch_id
@parent_step_id = parent_step_id
end
end

# Normalized child workflow sync request.
class ChildWorkflowSyncRequest
attr_reader :now, :parent_batch_id

def initialize(parent_batch_id:, now:)
@now = now
@parent_batch_id = parent_batch_id
end
end

# Groups the parent-side child workflow step identity.
ParentChildWorkflow = Struct.new(:parent_workflow_id, :parent_batch_id, :parent_job_id)

# Builds step-to-job metadata in definition order for child enqueues.
class ChildStepJobIds
def initialize(definition:, jobs:)
@definition = definition
@jobs = jobs
end

def to_h
definition.steps.each_with_object({}).with_index do |(workflow_step, step_jobs), index|
step_jobs[workflow_step.id] = jobs.fetch(index).id
end.freeze
end

private

attr_reader :definition, :jobs
end

private_constant :ChildStepJobIds,
:ChildWorkflowRequest,
:ChildWorkflowSyncRequest,
:ParentChildWorkflow

def enqueue_child_workflow_binding(parent:, parent_step_id:, binding:, definition:, now:)
jobs = binding.jobs
batch = build_enqueue_batch(batch_id: binding.batch_id, jobs:, now:)
validate_bulk_enqueue_uniqueness(jobs, now)
expire_reservations_locked(now)
queued_jobs = jobs.map { |job| enqueue_validated_job(job, now) }
store_batch(batch)
state.register_workflow_dependencies(binding.dependency_job_ids_by_job_id)
ChildWorkflowMetadata.new(state:, parent:, parent_step_id:, binding:, definition:).register
ChildWorkflowReport.new(binding:, queued_jobs:, now:).to_report
end

# Registers child workflow metadata after enqueue validation succeeds.
class ChildWorkflowMetadata
def initialize(state:, parent:, parent_step_id:, binding:, definition:)
@state = state
@parent = parent
@parent_step_id = parent_step_id
@binding = binding
@definition = definition
end

def register
batch_id = binding.batch_id
workflow_id = definition.id
state.register_workflow(
batch_id:,
workflow_id:,
step_job_ids: ChildStepJobIds.new(definition:, jobs: binding.jobs).to_h,
dependency_job_ids_by_job_id: binding.dependency_job_ids_by_job_id,
compensation_jobs_by_step_id: binding.compensation_jobs_by_step_id,
child_workflow_ids_by_step_id: WorkflowChildIds.new(definition).to_h
)
state.workflow_children.register(
parent_workflow_id: parent.parent_workflow_id,
parent_batch_id: parent.parent_batch_id,
parent_step_id:,
parent_job_id: parent.parent_job_id,
child_workflow_id: workflow_id,
child_batch_id: batch_id
)
end

private

attr_reader :binding, :definition, :parent, :parent_step_id, :state
end

# Builds the public child workflow enqueue report.
class ChildWorkflowReport
def initialize(binding:, queued_jobs:, now:)
@binding = binding
@queued_jobs = queued_jobs
@now = now
end

def to_report
BulkMutationReport.new(
action: :enqueue_child_workflow,
performed_at: now,
requested_job_ids: binding.jobs.map(&:id),
changed_jobs: queued_jobs,
skipped_jobs: []
)
end

private

attr_reader :binding, :now, :queued_jobs
end

private_constant :ChildWorkflowMetadata, :ChildWorkflowReport

def prepare_child_workflow_parent(parent_batch_id:, parent_step_id:, definition:)
parent_batch = fetch_batch(parent_batch_id)
batch_id = parent_batch.id
parent_registration = fetch_workflow_registration(batch_id)
expected_child_workflow_id = parent_registration.child_workflow_ids_by_step_id[parent_step_id]
validate_child_workflow_parent_step(parent_step_id, expected_child_workflow_id)
validate_child_workflow_definition(definition, expected_child_workflow_id, parent_step_id)
validate_child_workflow_not_registered(batch_id, parent_step_id)
validate_child_workflow_parent_job(parent_registration, parent_step_id, batch_id)
end

def validate_child_workflow_parent_step(parent_step_id, expected_child_workflow_id)
return if expected_child_workflow_id

raise Workflow::InvalidExecutionError, "workflow step #{parent_step_id.inspect} is not a child workflow step"
end

def validate_child_workflow_definition(definition, expected_child_workflow_id, parent_step_id)
raise Workflow::InvalidExecutionError, 'definition must be a Karya::Workflow::Definition' unless definition.is_a?(Workflow::Definition)

workflow_id = definition.id
return if expected_child_workflow_id == workflow_id

raise Workflow::InvalidExecutionError, "child workflow #{workflow_id.inspect} does not match parent step #{parent_step_id.inspect}"
end

def validate_child_workflow_not_registered(parent_batch_id, parent_step_id)
return unless child_relationship(parent_batch_id, parent_step_id)

raise Workflow::InvalidExecutionError, "child workflow already registered for step #{parent_step_id.inspect}"
end

def validate_child_workflow_parent_job(parent_registration, parent_step_id, parent_batch_id)
parent_job_id = parent_registration.step_job_ids.fetch(parent_step_id)
parent_job = state.jobs_by_id.fetch(parent_job_id)
raise Workflow::InvalidExecutionError, "parent child workflow step #{parent_step_id.inspect} must be queued" unless parent_job.state == :queued

ParentChildWorkflow.new(parent_registration.workflow_id, parent_batch_id, parent_job_id).freeze
end

def validate_child_batch_identity(parent_batch_id:, child_batch_id:)
return unless parent_batch_id == child_batch_id

raise Workflow::InvalidExecutionError, 'child workflow batch id must differ from parent batch id'
end

def child_relationship(parent_batch_id, parent_step_id)
state.workflow_children.for_parent_step(parent_batch_id, parent_step_id)
end

def child_relationships_for_parent_batch(parent_batch_id)
state.workflow_children.for_parent_batch(parent_batch_id)
end

def sync_child_workflow_job(job_id:, now:, changed_jobs:, skipped_jobs:)
relationship = state.workflow_children.for_parent_job(job_id)
child_batch_id = relationship.child_batch_id
case child_workflow_state(child_batch_id, now:)
when :failed
dead_letter_requested_job(job_id, now, "child workflow #{child_batch_id} failed", changed_jobs, skipped_jobs)
when :cancelled
cancel_requested_job(job_id, now, changed_jobs, skipped_jobs)
else
parent_job = state.jobs_by_id.fetch(job_id)
skipped_jobs << Karya::Internal::BulkMutation::SkippedJob.new(job_id:, reason: :ineligible_state, state: parent_job.state).to_h
end
end

def child_workflow_state(child_batch_id, now:)
batch_id = fetch_batch(child_batch_id).id
WorkflowChildState.new(state:, now:).resolve(batch_id)
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def matching_job_for(queue, handler_matcher, reserve_scan_state, now)

def reservable_candidate?(queued_job, handler_matcher, reserve_scan_state, now)
return false unless handler_matcher.include?(queued_job.handler)
return false unless workflow_dependencies_satisfied?(queued_job)
return false unless workflow_dependencies_satisfied?(queued_job, now:)
return false if reserve_scan_state.concurrency_blocked?(queued_job)
return false if reserve_scan_state.rate_limited?(queued_job)
return false if circuit_breaker_blocked?(queued_job, now)
Expand Down
Loading
Loading