Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions core/karya/lib/karya/queue_store/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,32 @@ def workflow_snapshot(batch_id:, now:)
raise NotImplementedError, "#{self.class} must implement ##{__method__}"
end

# Execute one explicit workflow state query.
def query_workflow(batch_id:, query:, now:)
_batch_id = batch_id
_query = query
_now = now
raise NotImplementedError, "#{self.class} must implement ##{__method__}"
end

# Persist one workflow signal delivery for a nonterminal workflow batch.
def deliver_workflow_signal(batch_id:, signal:, payload:, now:)
_batch_id = batch_id
_signal = signal
_payload = payload
_now = now
raise NotImplementedError, "#{self.class} must implement ##{__method__}"
end

# Persist one workflow external event delivery for a nonterminal workflow batch.
def deliver_workflow_event(batch_id:, event:, payload:, now:)
_batch_id = batch_id
_event = event
_payload = payload
_now = now
raise NotImplementedError, "#{self.class} must implement ##{__method__}"
end

# Enqueue a child workflow batch for one declared parent child step.
def enqueue_child_workflow(
parent_batch_id:,
Expand Down
5 changes: 4 additions & 1 deletion core/karya/lib/karya/queue_store/bulk_mutation_report.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class BulkMutationReport
replay_dead_letter_jobs
retry_dead_letter_jobs
discard_dead_letter_jobs
deliver_workflow_signal
deliver_workflow_event
enqueue_child_workflow
rollback_workflow
sync_child_workflows
Expand Down Expand Up @@ -130,7 +132,8 @@ def skipped_reason_error_message

def action_error_message
'action must be one of :enqueue_many, :retry_jobs, :cancel_jobs, :dead_letter_jobs, ' \
':replay_dead_letter_jobs, :retry_dead_letter_jobs, :discard_dead_letter_jobs, :enqueue_child_workflow, :rollback_workflow, ' \
':replay_dead_letter_jobs, :retry_dead_letter_jobs, :discard_dead_letter_jobs, :deliver_workflow_signal, ' \
':deliver_workflow_event, :enqueue_child_workflow, :rollback_workflow, ' \
':retry_workflow_steps, :dead_letter_workflow_steps, :replay_workflow_steps, ' \
':retry_dead_letter_workflow_steps, :discard_workflow_steps, or :sync_child_workflows'
end
Expand Down
3 changes: 3 additions & 0 deletions core/karya/lib/karya/queue_store/in_memory/internal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@
require_relative 'internal/store_state'
require_relative 'internal/uniqueness_support'
require_relative 'internal/workflow_child_ids'
require_relative 'internal/workflow_interaction_requirements'
require_relative 'internal/workflow_interactions_support'
require_relative 'internal/workflow_query'
require_relative 'internal/workflow_support'
194 changes: 180 additions & 14 deletions core/karya/lib/karya/queue_store/in_memory/internal/store_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ module Internal
# Internal mutable state for the single-process queue store.
class StoreState
MAX_TRACKED_FAIR_QUEUE_LISTS = 128

attr_reader :executions_by_token,
:batches_by_id,
:batch_id_by_job_id,
:breaker_failures_by_scope,
:breaker_states_by_scope,
:execution_tokens_in_order,
Expand All @@ -34,6 +34,7 @@ class StoreState
:stuck_job_recoveries_by_id,
:workflow_children,
:workflow_dependency_job_ids_by_job_id,
:workflow_interactions,
:workflow_rollback_batch_ids,
:workflow_registrations_by_batch_id,
:workflow_rollbacks_by_batch_id
Expand All @@ -43,12 +44,32 @@ class StoreState
:workflow_id,
:step_job_ids,
:dependency_job_ids_by_job_id,
:interaction_requirements_by_job_id,
:interaction_supported_keys,
:compensation_jobs_by_step_id,
:child_workflow_ids_by_step_id
)
) do
def self.build(
workflow_id:,
step_job_ids:,
dependency_job_ids_by_job_id:,
interaction_requirements_by_job_id:,
compensation_jobs_by_step_id:,
child_workflow_ids_by_step_id:
)
new(
workflow_id,
step_job_ids.dup.freeze,
dependency_job_ids_by_job_id.transform_values { |dependency_job_ids| dependency_job_ids.dup.freeze }.freeze,
interaction_requirements_by_job_id.transform_values { |requirement| requirement.dup.freeze }.freeze,
interaction_requirements_by_job_id.values.to_h { |requirement| [[requirement.fetch(:kind), requirement.fetch(:name)], true] }.freeze,
compensation_jobs_by_step_id.dup.freeze,
child_workflow_ids_by_step_id.dup.freeze
)
end
end
# Immutable owner-local rollback metadata for one workflow batch.
WorkflowRollback = Struct.new(:batch_id, :rollback_batch_id, :reason, :requested_at, :compensation_job_ids)

# Owner-local child workflow relationship registry.
class WorkflowChildren
# Immutable owner-local child workflow relationship metadata.
Expand Down Expand Up @@ -156,6 +177,116 @@ def delete_relationship(relationship, remove_parent_batch: true)
end
end

# Owner-local workflow interaction inbox keyed by workflow batch id.
class WorkflowInteractions
EMPTY = [].freeze
MAX_INTERACTIONS_PER_BATCH = 100
private_constant :EMPTY, :MAX_INTERACTIONS_PER_BATCH

def initialize
@by_batch_id = {}
@max_interactions_per_batch = MAX_INTERACTIONS_PER_BATCH
end

def for_batch(batch_id)
with_inbox(batch_id, fallback: EMPTY, &:to_a)
end

def includes?(batch_id:, kind:, name:)
with_inbox(batch_id, fallback: false) { |inbox| inbox.includes?(kind:, name:) }
end

def received_at_for(batch_id:, kind:, name:)
with_inbox(batch_id, fallback: nil) { |inbox| inbox.received_at_for(kind:, name:) }
end
Comment thread
niteshpurohit marked this conversation as resolved.

def register(batch_id:, interaction:)
current_inbox(batch_id).append(interaction).to_a
end
Comment thread
niteshpurohit marked this conversation as resolved.

def configure(batch_id:, supported_keys:)
current_inbox(batch_id).configure(supported_keys:)
end
Comment thread
niteshpurohit marked this conversation as resolved.

def delete_by_batch(batch_id)
with_inbox(batch_id, fallback: EMPTY, delete: true, &:to_a)
end

private

attr_reader :max_interactions_per_batch

def current_inbox(batch_id)
@by_batch_id[batch_id] ||= Inbox.new(max_size: max_interactions_per_batch)
end

def with_inbox(batch_id, fallback:, delete: false)
inbox = delete ? @by_batch_id.delete(batch_id) : @by_batch_id[batch_id]
return fallback unless inbox

yield inbox
end

# Owner-local bounded interaction buffer for one workflow batch.
class Inbox
def initialize(max_size:)
@max_size = max_size
@interactions = []
@to_a = EMPTY
@received_at_by_key = {}
@supported_keys = {}.freeze
end

def append(interaction)
interactions << interaction
track([interaction.kind, interaction.name], interaction.received_at)
interactions.shift if interactions.length > max_size
@to_a = nil
Comment thread
niteshpurohit marked this conversation as resolved.
Comment thread
niteshpurohit marked this conversation as resolved.
self
end

def configure(supported_keys:)
normalized_supported_keys =
if supported_keys.is_a?(Hash)
supported_keys.keys.to_h { |key| [key, true] }
else
supported_keys.to_h { |key| [key, true] }
end

@supported_keys = normalized_supported_keys.freeze
rebuild_received_at_index
self
end

def to_a
@to_a ||= interactions.dup.freeze
end

def includes?(kind:, name:)
received_at_by_key.key?([kind, name])
end

def received_at_for(kind:, name:)
received_at_by_key[[kind, name]]
end

private

attr_reader :interactions, :max_size, :received_at_by_key, :supported_keys

def rebuild_received_at_index
received_at_by_key.clear
interactions.each { |interaction| track([interaction.kind, interaction.name], interaction.received_at) }
end

def track(key, received_at)
return unless supported_keys.key?(key)

received_at_by_key[key] = received_at
end
end
end

# Decides whether a terminal child batch must remain because its parent is still active.
class ChildBatchRetention
def initialize(batches_by_id:, workflow_children:, terminal_batch:)
Expand Down Expand Up @@ -237,6 +368,10 @@ def workflow_children
workflow_indexes.fetch(:workflow_children)
end

def workflow_interactions
workflow_indexes.fetch(:workflow_interactions)
end

def workflow_dependency_job_ids_by_job_id
workflow_indexes.fetch(:workflow_dependency_job_ids_by_job_id)
end
Expand All @@ -263,6 +398,7 @@ def cleanup_batch(batch_id:, batch:)
},
workflow_indexes: {
workflow_children:,
workflow_interactions:,
workflow_rollback_batch_ids:,
workflow_registrations_by_batch_id:,
workflow_rollbacks_by_batch_id:
Expand All @@ -279,16 +415,19 @@ def register_workflow(
step_job_ids:,
dependency_job_ids_by_job_id:,
compensation_jobs_by_step_id:,
interaction_requirements_by_job_id: {},
child_workflow_ids_by_step_id: {}
)
registration = WorkflowRegistration.new(
workflow_id,
step_job_ids.dup.freeze,
dependency_job_ids_by_job_id.transform_values { |dependency_job_ids| dependency_job_ids.dup.freeze }.freeze,
compensation_jobs_by_step_id.dup.freeze,
child_workflow_ids_by_step_id.dup.freeze
registration = WorkflowRegistration.build(
workflow_id:,
step_job_ids:,
dependency_job_ids_by_job_id:,
interaction_requirements_by_job_id:,
compensation_jobs_by_step_id:,
child_workflow_ids_by_step_id:
).freeze
workflow_registrations_by_batch_id[batch_id] = registration
workflow_interactions.configure(batch_id:, supported_keys: registration.interaction_supported_keys)
child_workflow_ids_by_step_id.each do |step_id, child_workflow_id|
workflow_children.register_expected_child(step_job_ids.fetch(step_id), child_workflow_id)
end
Expand All @@ -301,10 +440,26 @@ def register_workflow_dependencies(dependency_job_ids_by_job_id)
)
end

def register_workflow_interaction(batch_id:, interaction:)
workflow_interactions.register(batch_id:, interaction:)
end

def workflow_dependency_job_ids_for(job_id)
workflow_dependency_job_ids_by_job_id[job_id]
end

def workflow_interactions_for(batch_id)
workflow_interactions.for_batch(batch_id)
end

def workflow_interaction_delivered?(batch_id:, kind:, name:)
workflow_interactions.includes?(batch_id:, kind:, name:)
end

def workflow_interaction_received_at(batch_id:, kind:, name:)
workflow_interactions.received_at_for(batch_id:, kind:, name:)
end

def register_workflow_rollback(batch_id:, rollback_batch_id:, reason:, requested_at:, compensation_job_ids:)
workflow_rollback_batch_ids[rollback_batch_id] = true
workflow_rollbacks_by_batch_id[batch_id] = WorkflowRollback.new(
Expand All @@ -322,6 +477,7 @@ def register_workflow_rollback(batch_id:, rollback_batch_id:, reason:, requested
private_constant :ChildBatchRetention,
:TerminalBatchPruner,
:WorkflowChildren,
:WorkflowInteractions,
:WorkflowMetadata,
:WorkflowRegistration,
:WorkflowRollback
Expand Down Expand Up @@ -351,11 +507,15 @@ def initialize(expired_tombstone_limit:)
@stuck_job_recoveries_by_id = {}
@terminal_batch_ids_index = {}
@terminal_batch_ids_in_order = []
@workflow_children = WorkflowChildren.new
@workflow_dependency_job_ids_by_job_id = {}
@workflow_rollback_batch_ids = {}
@workflow_registrations_by_batch_id = {}
@workflow_rollbacks_by_batch_id = {}
workflow_state = {
workflow_children: WorkflowChildren.new,
workflow_dependency_job_ids_by_job_id: {},
workflow_interactions: WorkflowInteractions.new,
workflow_rollback_batch_ids: {},
workflow_registrations_by_batch_id: {},
workflow_rollbacks_by_batch_id: {}
}
workflow_state.each { |name, value| instance_variable_set(:"@#{name}", value) }
end

def queue_job_ids_for(queue)
Expand Down Expand Up @@ -519,6 +679,7 @@ def prune_terminal_batches(retention_limit, changed_job: nil)
workflow_indexes: {
workflow_dependency_job_ids_by_job_id:,
workflow_children:,
workflow_interactions:,
workflow_rollback_batch_ids:,
workflow_registrations_by_batch_id:,
workflow_rollbacks_by_batch_id:
Expand Down Expand Up @@ -606,6 +767,7 @@ def cleanup_stale_batch_membership
def cleanup_workflow_registration
registration = workflow_registrations_by_batch_id.delete(batch_id)
rollback = workflow_rollbacks_by_batch_id.delete(batch_id)
workflow_interactions.delete_by_batch(batch_id)
cleanup_child_workflows(registration)
workflow_rollback_batch_ids.delete(rollback.rollback_batch_id) if rollback
registration
Expand Down Expand Up @@ -656,6 +818,10 @@ def workflow_rollback_batch_ids
workflow_indexes.fetch(:workflow_rollback_batch_ids)
end

def workflow_interactions
workflow_indexes.fetch(:workflow_interactions)
end

def workflow_children
workflow_indexes.fetch(:workflow_children)
end
Expand Down
Loading
Loading