From 15e478b0ff2acc6d476b63aa66bf4f12b1384711 Mon Sep 17 00:00:00 2001 From: Sergei Alekseenko Date: Thu, 19 Mar 2026 19:52:52 +0400 Subject: [PATCH 1/3] Migrate background jobs from Resque to Solid Queue --- Gemfile | 5 +- Procfile | 3 +- app/jobs/base_bundle_job.rb | 4 +- app/jobs/concerns/nested_job_tracker.rb | 36 ++++ app/jobs/document_gdoc_job.rb | 2 +- app/jobs/document_parse_job.rb | 2 +- app/jobs/document_pdf_job.rb | 2 +- app/jobs/integrations/webhook_call_job.rb | 2 +- app/jobs/job_result_cleanup_job.rb | 9 + app/jobs/material_gdoc_job.rb | 2 +- app/jobs/material_parse_job.rb | 2 +- app/jobs/material_pdf_job.rb | 2 +- app/models/job_result.rb | 10 ++ bin/jobs | 6 + config/application.rb | 4 +- config/environments/development.rb | 4 +- config/environments/production.rb | 2 +- config/environments/qa.rb | 4 +- config/recurring.yml | 8 + config/routes.rb | 5 +- config/solid_queue.yml | 18 ++ .../20260319000001_create_job_results.rb | 17 ++ ...0260319000002_create_solid_queue_tables.rb | 141 +++++++++++++++ db/queue_schema.rb | 129 ++++++++++++++ docker-compose.yml | 17 ++ lib/job_tracker.rb | 161 ++++++++++++++++++ spec/jobs/concerns/nested_job_tracker_spec.rb | 78 +++++++++ spec/jobs/concerns/unit_level_job_spec.rb | 2 +- spec/jobs/document_gdoc_job_spec.rb | 4 +- spec/jobs/document_pdf_job_spec.rb | 4 +- spec/jobs/material_gdoc_job_spec.rb | 4 +- spec/jobs/material_pdf_job_spec.rb | 4 +- spec/lib/job_tracker_spec.rb | 105 ++++++++++++ spec/models/job_result_spec.rb | 37 ++++ 34 files changed, 804 insertions(+), 31 deletions(-) create mode 100644 app/jobs/concerns/nested_job_tracker.rb create mode 100644 app/jobs/job_result_cleanup_job.rb create mode 100644 app/models/job_result.rb create mode 100755 bin/jobs create mode 100644 config/recurring.yml create mode 100644 config/solid_queue.yml create mode 100644 db/migrate/20260319000001_create_job_results.rb create mode 100644 db/migrate/20260319000002_create_solid_queue_tables.rb create mode 100644 db/queue_schema.rb create mode 100644 lib/job_tracker.rb create mode 100644 spec/jobs/concerns/nested_job_tracker_spec.rb create mode 100644 spec/lib/job_tracker_spec.rb create mode 100644 spec/models/job_result_spec.rb diff --git a/Gemfile b/Gemfile index 9c48ec8..fbbca2a 100644 --- a/Gemfile +++ b/Gemfile @@ -84,10 +84,11 @@ gem "devise", "~> 5.0" # Background Jobs & Queue gem "redis" gem "hiredis-client" -gem "resque" -gem "resque-scheduler", "~> 5.0" +gem "resque" # Kept temporarily for draining in-flight jobs +gem "resque-scheduler", "~> 5.0" # Kept temporarily for draining in-flight jobs gem "activejob-retry", "~> 0.6.3" gem "concurrent-ruby", "~> 1.3" +gem "mission_control-jobs" # Search & Full-text gem "elasticsearch-model", "~> 8.0" diff --git a/Procfile b/Procfile index fdae890..785cd84 100644 --- a/Procfile +++ b/Procfile @@ -1,2 +1 @@ -worker: bundle exec rake resque:work QUEUE=* -scheduler: bundle exec rake resque:scheduler \ No newline at end of file +worker: bundle exec rake solid_queue:start diff --git a/app/jobs/base_bundle_job.rb b/app/jobs/base_bundle_job.rb index 283ac8e..8335abd 100644 --- a/app/jobs/base_bundle_job.rb +++ b/app/jobs/base_bundle_job.rb @@ -5,8 +5,8 @@ # Shouldn't be called itself, constants should be defined in inherited classes # class BaseBundleJob < ApplicationJob - include ResqueJob - include NestedResqueJob + include JobTracker + include NestedJobTracker def perform(_entry_id, _options = {}) raise NotImplementedError diff --git a/app/jobs/concerns/nested_job_tracker.rb b/app/jobs/concerns/nested_job_tracker.rb new file mode 100644 index 0000000..7a2e686 --- /dev/null +++ b/app/jobs/concerns/nested_job_tracker.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +module NestedJobTracker + extend ActiveSupport::Concern + + class_methods do + def queued_or_running_nested?(job_id, current_job_id = "-1") + check_child = ->(j) { j["arguments"][1]&.dig("initial_job_id") == job_id && j["job_id"] != current_job_id } + job_klasses = self::NESTED_JOBS + [name] + job_klasses.each do |job_klass| + queued = find_in_queue_by_payload(job_klass, &check_child) || + find_in_working_by_payload(job_klass, &check_child) + return true if queued.present? + end + false + end + + def status_nested(jid) + self_status = status(jid) + return self_status unless self_status == :done + return :running if queued_or_running_nested?(jid) + + :done + end + + def fetch_result_nested(jid) + JobResult.for_parent(jid).pluck(:result) + end + end + + private + + def initial_job_id + @initial_job_id ||= options[:initial_job_id].presence || job_id + end +end diff --git a/app/jobs/document_gdoc_job.rb b/app/jobs/document_gdoc_job.rb index d92ef3c..3992af1 100644 --- a/app/jobs/document_gdoc_job.rb +++ b/app/jobs/document_gdoc_job.rb @@ -17,7 +17,7 @@ # class DocumentGdocJob < ApplicationJob include DocumentRescuableJob - include ResqueJob + include JobTracker queue_as :default diff --git a/app/jobs/document_parse_job.rb b/app/jobs/document_parse_job.rb index 0795178..0c7ba39 100644 --- a/app/jobs/document_parse_job.rb +++ b/app/jobs/document_parse_job.rb @@ -3,7 +3,7 @@ require "lt/google/api/auth/cli" class DocumentParseJob < ApplicationJob - include ResqueJob + include JobTracker include RetryDelayed queue_as :default diff --git a/app/jobs/document_pdf_job.rb b/app/jobs/document_pdf_job.rb index bfeda59..cb06dd1 100644 --- a/app/jobs/document_pdf_job.rb +++ b/app/jobs/document_pdf_job.rb @@ -2,7 +2,7 @@ class DocumentPdfJob < ApplicationJob include DocumentRescuableJob - include ResqueJob + include JobTracker queue_as :default diff --git a/app/jobs/integrations/webhook_call_job.rb b/app/jobs/integrations/webhook_call_job.rb index f5f08b4..9719c0f 100644 --- a/app/jobs/integrations/webhook_call_job.rb +++ b/app/jobs/integrations/webhook_call_job.rb @@ -2,7 +2,7 @@ module Integrations class WebhookCallJob < ApplicationJob - extend ResqueJob + extend JobTracker include RetryDelayed queue_as :low diff --git a/app/jobs/job_result_cleanup_job.rb b/app/jobs/job_result_cleanup_job.rb new file mode 100644 index 0000000..9587bf7 --- /dev/null +++ b/app/jobs/job_result_cleanup_job.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class JobResultCleanupJob < ApplicationJob + queue_as :default + + def perform + JobResult.cleanup_expired + end +end diff --git a/app/jobs/material_gdoc_job.rb b/app/jobs/material_gdoc_job.rb index a8e435e..0451c37 100644 --- a/app/jobs/material_gdoc_job.rb +++ b/app/jobs/material_gdoc_job.rb @@ -20,7 +20,7 @@ # class MaterialGdocJob < ApplicationJob include MaterialRescuableJob - include ResqueJob + include JobTracker queue_as :default diff --git a/app/jobs/material_parse_job.rb b/app/jobs/material_parse_job.rb index 66cdd91..2491263 100644 --- a/app/jobs/material_parse_job.rb +++ b/app/jobs/material_parse_job.rb @@ -3,7 +3,7 @@ require "lt/google/api/auth/cli" class MaterialParseJob < ApplicationJob - include ResqueJob + include JobTracker queue_as :default diff --git a/app/jobs/material_pdf_job.rb b/app/jobs/material_pdf_job.rb index 562887c..42db6be 100644 --- a/app/jobs/material_pdf_job.rb +++ b/app/jobs/material_pdf_job.rb @@ -2,7 +2,7 @@ class MaterialPdfJob < ApplicationJob include MaterialRescuableJob - include ResqueJob + include JobTracker queue_as :default diff --git a/app/models/job_result.rb b/app/models/job_result.rb new file mode 100644 index 0000000..772cd8a --- /dev/null +++ b/app/models/job_result.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +class JobResult < ApplicationRecord + scope :expired, -> { where(created_at: ...1.hour.ago) } + scope :for_parent, ->(jid) { where(parent_job_id: jid) } + + def self.cleanup_expired + expired.delete_all + end +end diff --git a/bin/jobs b/bin/jobs new file mode 100755 index 0000000..dcf59f3 --- /dev/null +++ b/bin/jobs @@ -0,0 +1,6 @@ +#!/usr/bin/env ruby + +require_relative "../config/environment" +require "solid_queue/cli" + +SolidQueue::Cli.start(ARGV) diff --git a/config/application.rb b/config/application.rb index fc9217e..6747d91 100644 --- a/config/application.rb +++ b/config/application.rb @@ -58,8 +58,8 @@ class Application < Rails::Application Rails.root.join("lib") ] - # Queue adapter configuration (Resque) - config.active_job.queue_adapter = :resque + # Queue adapter configuration (Solid Queue) + config.active_job.queue_adapter = :solid_queue # Asset paths configuration for fonts and icons config.assets.paths << Rails.root.join("node_modules/@fortawesome/fontawesome-free/webfonts") diff --git a/config/environments/development.rb b/config/environments/development.rb index 1442deb..3b41125 100644 --- a/config/environments/development.rb +++ b/config/environments/development.rb @@ -55,8 +55,8 @@ # Highlight code that enqueued background job in logs. config.active_job.verbose_enqueue_logs = true - # Use Resque as the ActiveJob queue adapter - config.active_job.queue_adapter = :resque + # Use Solid Queue as the ActiveJob queue adapter + config.active_job.queue_adapter = :solid_queue # Highlight code that triggered redirect in logs. config.action_dispatch.verbose_redirect_logs = true diff --git a/config/environments/production.rb b/config/environments/production.rb index a047064..04c7cf4 100644 --- a/config/environments/production.rb +++ b/config/environments/production.rb @@ -50,7 +50,7 @@ # config.cache_store = :mem_cache_store # Replace the default in-process and non-durable queuing backend for Active Job. - config.active_job.queue_adapter = :resque + config.active_job.queue_adapter = :solid_queue # Ignore bad email addresses and do not raise email delivery errors. # Set this to true and configure the email server for immediate delivery to raise delivery errors. diff --git a/config/environments/qa.rb b/config/environments/qa.rb index ff26385..2566954 100644 --- a/config/environments/qa.rb +++ b/config/environments/qa.rb @@ -55,8 +55,8 @@ # Highlight code that enqueued background job in logs. config.active_job.verbose_enqueue_logs = true - # Use Resque as the ActiveJob queue adapter - config.active_job.queue_adapter = :resque + # Use Solid Queue as the ActiveJob queue adapter + config.active_job.queue_adapter = :solid_queue # Highlight code that triggered redirect in logs. config.action_dispatch.verbose_redirect_logs = true diff --git a/config/recurring.yml b/config/recurring.yml new file mode 100644 index 0000000..d987b35 --- /dev/null +++ b/config/recurring.yml @@ -0,0 +1,8 @@ +production: + cleanup_job_results: + class: JobResultCleanupJob + schedule: every hour + + clear_solid_queue_finished_jobs: + command: "SolidQueue::Job.clear_finished_in_batches(sleep_between_batches: 0.3)" + schedule: every hour at minute 12 diff --git a/config/routes.rb b/config/routes.rb index 322ebb9..1709ac9 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -22,9 +22,10 @@ registrations: "registrations" } - # Resque dashboard (behind authentication) + # Job monitoring dashboards (behind authentication) authenticate :user do - mount Resque::Server, at: "/queue" + mount MissionControl::Jobs::Engine, at: "/jobs" + mount Resque::Server, at: "/queue" # Kept temporarily for draining monitoring end namespace :admin do diff --git a/config/solid_queue.yml b/config/solid_queue.yml new file mode 100644 index 0000000..9eace59 --- /dev/null +++ b/config/solid_queue.yml @@ -0,0 +1,18 @@ +default: &default + dispatchers: + - polling_interval: 1 + batch_size: 500 + workers: + - queues: "*" + threads: 3 + processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %> + polling_interval: 0.1 + +development: + <<: *default + +test: + <<: *default + +production: + <<: *default diff --git a/db/migrate/20260319000001_create_job_results.rb b/db/migrate/20260319000001_create_job_results.rb new file mode 100644 index 0000000..284fbab --- /dev/null +++ b/db/migrate/20260319000001_create_job_results.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class CreateJobResults < ActiveRecord::Migration[8.1] + def change + create_table :job_results do |t| + t.string :job_id, null: false + t.string :parent_job_id + t.string :job_class, null: false + t.jsonb :result, default: {} + t.timestamps + end + + add_index :job_results, :job_id, unique: true + add_index :job_results, :parent_job_id + add_index :job_results, [:parent_job_id, :job_class] + end +end diff --git a/db/migrate/20260319000002_create_solid_queue_tables.rb b/db/migrate/20260319000002_create_solid_queue_tables.rb new file mode 100644 index 0000000..7b12046 --- /dev/null +++ b/db/migrate/20260319000002_create_solid_queue_tables.rb @@ -0,0 +1,141 @@ +# frozen_string_literal: true + +class CreateSolidQueueTables < ActiveRecord::Migration[8.1] + def change + create_table :solid_queue_jobs do |t| + t.string :queue_name, null: false + t.string :class_name, null: false + t.text :arguments + t.integer :priority, default: 0, null: false + t.string :active_job_id + t.datetime :scheduled_at + t.datetime :finished_at + t.string :concurrency_key + t.timestamps + + t.index :active_job_id + t.index :class_name + t.index :finished_at + t.index [:queue_name, :finished_at], name: "index_solid_queue_jobs_for_filtering" + t.index [:scheduled_at, :finished_at], name: "index_solid_queue_jobs_for_alerting" + end + + create_table :solid_queue_scheduled_executions do |t| + t.bigint :job_id, null: false + t.string :queue_name, null: false + t.integer :priority, default: 0, null: false + t.datetime :scheduled_at, null: false + t.datetime :created_at, null: false + + t.index :job_id, unique: true + t.index [:scheduled_at, :priority, :job_id], name: "index_solid_queue_dispatch_all" + end + + create_table :solid_queue_ready_executions do |t| + t.bigint :job_id, null: false + t.string :queue_name, null: false + t.integer :priority, default: 0, null: false + t.datetime :created_at, null: false + + t.index :job_id, unique: true + t.index [:priority, :job_id], name: "index_solid_queue_poll_all" + t.index [:queue_name, :priority, :job_id], name: "index_solid_queue_poll_by_queue" + end + + create_table :solid_queue_claimed_executions do |t| + t.bigint :job_id, null: false + t.bigint :process_id + t.datetime :created_at, null: false + + t.index :job_id, unique: true + t.index [:process_id, :job_id] + end + + create_table :solid_queue_blocked_executions do |t| + t.bigint :job_id, null: false + t.string :queue_name, null: false + t.integer :priority, default: 0, null: false + t.string :concurrency_key, null: false + t.datetime :expires_at, null: false + t.datetime :created_at, null: false + + t.index [:concurrency_key, :priority, :job_id], name: "index_solid_queue_blocked_executions_for_release" + t.index [:expires_at, :concurrency_key], name: "index_solid_queue_blocked_executions_for_maintenance" + t.index :job_id, unique: true + end + + create_table :solid_queue_failed_executions do |t| + t.bigint :job_id, null: false + t.text :error + t.datetime :created_at, null: false + + t.index :job_id, unique: true + end + + create_table :solid_queue_pauses do |t| + t.string :queue_name, null: false + t.datetime :created_at, null: false + + t.index :queue_name, unique: true + end + + create_table :solid_queue_processes do |t| + t.string :kind, null: false + t.datetime :last_heartbeat_at, null: false + t.bigint :supervisor_id + t.integer :pid, null: false + t.string :hostname + t.text :metadata + t.datetime :created_at, null: false + t.string :name, null: false + + t.index :last_heartbeat_at + t.index [:name, :supervisor_id], unique: true + t.index :supervisor_id + end + + create_table :solid_queue_semaphores do |t| + t.string :key, null: false + t.integer :value, default: 1, null: false + t.datetime :expires_at, null: false + t.timestamps + + t.index :expires_at + t.index [:key, :value] + t.index :key, unique: true + end + + create_table :solid_queue_recurring_executions do |t| + t.bigint :job_id, null: false + t.string :task_key, null: false + t.datetime :run_at, null: false + t.datetime :created_at, null: false + + t.index :job_id, unique: true + t.index [:task_key, :run_at], unique: true + end + + create_table :solid_queue_recurring_tasks do |t| + t.string :key, null: false + t.string :schedule, null: false + t.string :command, limit: 2048 + t.string :class_name + t.text :arguments + t.string :queue_name + t.integer :priority, default: 0 + t.boolean :static, default: true, null: false + t.text :description + t.timestamps + + t.index :key, unique: true + t.index :static + end + + add_foreign_key :solid_queue_blocked_executions, :solid_queue_jobs, column: :job_id, on_delete: :cascade + add_foreign_key :solid_queue_claimed_executions, :solid_queue_jobs, column: :job_id, on_delete: :cascade + add_foreign_key :solid_queue_failed_executions, :solid_queue_jobs, column: :job_id, on_delete: :cascade + add_foreign_key :solid_queue_ready_executions, :solid_queue_jobs, column: :job_id, on_delete: :cascade + add_foreign_key :solid_queue_recurring_executions, :solid_queue_jobs, column: :job_id, on_delete: :cascade + add_foreign_key :solid_queue_scheduled_executions, :solid_queue_jobs, column: :job_id, on_delete: :cascade + end +end diff --git a/db/queue_schema.rb b/db/queue_schema.rb new file mode 100644 index 0000000..85194b6 --- /dev/null +++ b/db/queue_schema.rb @@ -0,0 +1,129 @@ +ActiveRecord::Schema[7.1].define(version: 1) do + create_table "solid_queue_blocked_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.string "concurrency_key", null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.index [ "concurrency_key", "priority", "job_id" ], name: "index_solid_queue_blocked_executions_for_release" + t.index [ "expires_at", "concurrency_key" ], name: "index_solid_queue_blocked_executions_for_maintenance" + t.index [ "job_id" ], name: "index_solid_queue_blocked_executions_on_job_id", unique: true + end + + create_table "solid_queue_claimed_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.bigint "process_id" + t.datetime "created_at", null: false + t.index [ "job_id" ], name: "index_solid_queue_claimed_executions_on_job_id", unique: true + t.index [ "process_id", "job_id" ], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + end + + create_table "solid_queue_failed_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.text "error" + t.datetime "created_at", null: false + t.index [ "job_id" ], name: "index_solid_queue_failed_executions_on_job_id", unique: true + end + + create_table "solid_queue_jobs", force: :cascade do |t| + t.string "queue_name", null: false + t.string "class_name", null: false + t.text "arguments" + t.integer "priority", default: 0, null: false + t.string "active_job_id" + t.datetime "scheduled_at" + t.datetime "finished_at" + t.string "concurrency_key" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index [ "active_job_id" ], name: "index_solid_queue_jobs_on_active_job_id" + t.index [ "class_name" ], name: "index_solid_queue_jobs_on_class_name" + t.index [ "finished_at" ], name: "index_solid_queue_jobs_on_finished_at" + t.index [ "queue_name", "finished_at" ], name: "index_solid_queue_jobs_for_filtering" + t.index [ "scheduled_at", "finished_at" ], name: "index_solid_queue_jobs_for_alerting" + end + + create_table "solid_queue_pauses", force: :cascade do |t| + t.string "queue_name", null: false + t.datetime "created_at", null: false + t.index [ "queue_name" ], name: "index_solid_queue_pauses_on_queue_name", unique: true + end + + create_table "solid_queue_processes", force: :cascade do |t| + t.string "kind", null: false + t.datetime "last_heartbeat_at", null: false + t.bigint "supervisor_id" + t.integer "pid", null: false + t.string "hostname" + t.text "metadata" + t.datetime "created_at", null: false + t.string "name", null: false + t.index [ "last_heartbeat_at" ], name: "index_solid_queue_processes_on_last_heartbeat_at" + t.index [ "name", "supervisor_id" ], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index [ "supervisor_id" ], name: "index_solid_queue_processes_on_supervisor_id" + end + + create_table "solid_queue_ready_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "created_at", null: false + t.index [ "job_id" ], name: "index_solid_queue_ready_executions_on_job_id", unique: true + t.index [ "priority", "job_id" ], name: "index_solid_queue_poll_all" + t.index [ "queue_name", "priority", "job_id" ], name: "index_solid_queue_poll_by_queue" + end + + create_table "solid_queue_recurring_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "task_key", null: false + t.datetime "run_at", null: false + t.datetime "created_at", null: false + t.index [ "job_id" ], name: "index_solid_queue_recurring_executions_on_job_id", unique: true + t.index [ "task_key", "run_at" ], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true + end + + create_table "solid_queue_recurring_tasks", force: :cascade do |t| + t.string "key", null: false + t.string "schedule", null: false + t.string "command", limit: 2048 + t.string "class_name" + t.text "arguments" + t.string "queue_name" + t.integer "priority", default: 0 + t.boolean "static", default: true, null: false + t.text "description" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index [ "key" ], name: "index_solid_queue_recurring_tasks_on_key", unique: true + t.index [ "static" ], name: "index_solid_queue_recurring_tasks_on_static" + end + + create_table "solid_queue_scheduled_executions", force: :cascade do |t| + t.bigint "job_id", null: false + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.datetime "scheduled_at", null: false + t.datetime "created_at", null: false + t.index [ "job_id" ], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true + t.index [ "scheduled_at", "priority", "job_id" ], name: "index_solid_queue_dispatch_all" + end + + create_table "solid_queue_semaphores", force: :cascade do |t| + t.string "key", null: false + t.integer "value", default: 1, null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index [ "expires_at" ], name: "index_solid_queue_semaphores_on_expires_at" + t.index [ "key", "value" ], name: "index_solid_queue_semaphores_on_key_and_value" + t.index [ "key" ], name: "index_solid_queue_semaphores_on_key", unique: true + end + + add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade +end diff --git a/docker-compose.yml b/docker-compose.yml index f7f3eeb..4a11e3d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,6 +36,21 @@ services: - .env.docker - .env.development + solid_queue: + image: lcms-core:dev + command: | + bash -c "bundle install && bundle exec rake solid_queue:start" + volumes: + - .:/app + - bundle:/usr/local/bundle + depends_on: + - db + - redis + env_file: + - .env.docker + - .env.development + + # Keep resque temporarily to drain in-flight jobs during migration resque: image: lcms-core:dev command: | @@ -52,6 +67,8 @@ services: environment: - TERM_CHILD=1 - RESQUE_TERM_TIMEOUT=10 + profiles: + - resque-drain css: image: lcms-core:dev diff --git a/lib/job_tracker.rb b/lib/job_tracker.rb new file mode 100644 index 0000000..e5dd07a --- /dev/null +++ b/lib/job_tracker.rb @@ -0,0 +1,161 @@ +# frozen_string_literal: true + +module JobTracker + def self.included(base) # :nodoc: + base.extend ClassMethods + end + + module ClassMethods + def find(job_id) + find_in_queue(job_id) || find_in_working(job_id) + end + + def find_in_queue(job_id) + sq_job = SolidQueue::Job.find_by(active_job_id: job_id) + return sq_job if sq_job&.ready? + + resque_find_in_queue(job_id) + end + + def find_in_queue_by_payload(job_class, &block) + jobs = SolidQueue::Job + .where(class_name: job_class.to_s, finished_at: nil) + .where.not(id: SolidQueue::ClaimedExecution.select(:job_id)) + .flat_map { |j| Array.wrap(j.arguments) } + + # Also check Resque for draining jobs + jobs += resque_find_in_queue_by_payload(job_class) + + return jobs unless block_given? + + jobs.detect(&block) + end + + def find_in_working(job_id) + sq_job = SolidQueue::Job.find_by(active_job_id: job_id) + return sq_job if sq_job&.claimed? + + resque_find_in_working(job_id) + end + + def find_in_working_by_payload(job_class, &block) + jobs = SolidQueue::Job + .where(class_name: job_class.to_s, finished_at: nil) + .joins(:claimed_execution) + .flat_map { |j| Array.wrap(j.arguments) } + + # Also check Resque for draining jobs + jobs += resque_find_in_working_by_payload(job_class) + + return jobs unless block_given? + + jobs.detect(&block) + end + + def fetch_result(job_id) + record = JobResult.find_by(job_id: job_id) + record&.result + end + + def result_key(job_id) + [name.underscore, job_id].join(":") + end + + def status(job_id) + # Check Solid Queue first + sq_job = SolidQueue::Job.find_by(active_job_id: job_id) + if sq_job + return :waiting if sq_job.ready? + return :running if sq_job.claimed? + return :done if sq_job.finished? + return :done if sq_job.failed? + end + + # Check if result exists (job already finished and was cleared) + return :done if JobResult.exists?(job_id: job_id) + + # Fallback: check Resque for draining jobs + if resque_available? + return :waiting if resque_find_in_queue(job_id) + return :running if resque_find_in_working(job_id) + end + + :done + end + + private + + def resque_available? + defined?(Resque) && (Resque.redis.connected? rescue false) + end + + def resque_find_in_queue(job_id) + return nil unless resque_available? + + Resque.peek(queue_name, 0, 0) + .map { |job| job["args"].first } + .detect { |job| job["job_id"] == job_id } + rescue StandardError + nil + end + + def resque_find_in_queue_by_payload(job_class) + return [] unless resque_available? + + jobs = Array.wrap Resque.peek(queue_name, 0, 0) + jobs + .select { |j| j["args"].first["job_class"] == job_class.to_s } + .flat_map { |j| j["args"] } + rescue StandardError + [] + end + + def resque_find_in_working(job_id) + return nil unless resque_available? + + Resque::Worker.working.map(&:job).detect do |job| + if job.is_a?(Hash) && (args = job.dig "payload", "args").is_a?(Array) + args.detect { |x| x["job_id"] == job_id } + end + end + rescue StandardError + nil + end + + def resque_find_in_working_by_payload(job_class) + return [] unless resque_available? + + Resque::Worker.working.map(&:job).flat_map do |job| + next unless job.is_a?(Hash) && (args = job.dig "payload", "args").is_a?(Array) + + args.select { |x| x["job_class"] == job_class.to_s } + end.compact + rescue StandardError + [] + end + end + + def result_key + @result_key ||= self.class.result_key(job_id) + end + + def store_initial_result(res, options = {}) + jid = options[:initial_job_id].presence || job_id + JobResult.upsert( + { job_id: jid, parent_job_id: nil, job_class: self.class.name, result: res }, + unique_by: :job_id + ) + end + + # + # @param [Hash] res + # @param [Hash] options + # + def store_result(res, options = {}) + parent_jid = options[:initial_job_id].presence + JobResult.upsert( + { job_id: job_id, parent_job_id: parent_jid, job_class: self.class.name, result: res }, + unique_by: :job_id + ) + end +end diff --git a/spec/jobs/concerns/nested_job_tracker_spec.rb b/spec/jobs/concerns/nested_job_tracker_spec.rb new file mode 100644 index 0000000..342ffde --- /dev/null +++ b/spec/jobs/concerns/nested_job_tracker_spec.rb @@ -0,0 +1,78 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe NestedJobTracker do + let(:test_class) do + Class.new(ApplicationJob) do + include JobTracker + include NestedJobTracker + + NESTED_JOBS = %w(DocumentPdfJob MaterialPdfJob).freeze + + queue_as :default + + def perform(entry_id, options = {}); end + end + end + + before { stub_const("TestNestedTrackerJob", test_class) } + + describe ".status_nested" do + let(:jid) { SecureRandom.uuid } + + context "when parent job is done and no nested jobs are running" do + before do + allow(TestNestedTrackerJob).to receive(:status).with(jid).and_return(:done) + allow(TestNestedTrackerJob).to receive(:queued_or_running_nested?).with(jid).and_return(false) + end + + it "returns :done" do + expect(TestNestedTrackerJob.status_nested(jid)).to eq(:done) + end + end + + context "when parent job is done but nested jobs are still running" do + before do + allow(TestNestedTrackerJob).to receive(:status).with(jid).and_return(:done) + allow(TestNestedTrackerJob).to receive(:queued_or_running_nested?).with(jid).and_return(true) + end + + it "returns :running" do + expect(TestNestedTrackerJob.status_nested(jid)).to eq(:running) + end + end + + context "when parent job is still waiting" do + before do + allow(TestNestedTrackerJob).to receive(:status).with(jid).and_return(:waiting) + end + + it "returns :waiting" do + expect(TestNestedTrackerJob.status_nested(jid)).to eq(:waiting) + end + end + end + + describe ".fetch_result_nested" do + let(:parent_jid) { SecureRandom.uuid } + + before do + JobResult.create!(job_id: "child-1", parent_job_id: parent_jid, job_class: "DocumentPdfJob", result: { ok: true }) + JobResult.create!(job_id: "child-2", parent_job_id: parent_jid, job_class: "MaterialPdfJob", result: { ok: false, errors: ["fail"] }) + JobResult.create!(job_id: "other-1", parent_job_id: "other-parent", job_class: "DocumentPdfJob", result: { ok: true }) + end + + it "returns results for all nested jobs with the given parent_job_id" do + results = TestNestedTrackerJob.fetch_result_nested(parent_jid) + expect(results.size).to eq(2) + expect(results).to include({ "ok" => true }) + expect(results).to include({ "ok" => false, "errors" => ["fail"] }) + end + + it "does not return results from other parents" do + results = TestNestedTrackerJob.fetch_result_nested(parent_jid) + expect(results.size).to eq(2) + end + end +end diff --git a/spec/jobs/concerns/unit_level_job_spec.rb b/spec/jobs/concerns/unit_level_job_spec.rb index a54f0d7..65aabac 100644 --- a/spec/jobs/concerns/unit_level_job_spec.rb +++ b/spec/jobs/concerns/unit_level_job_spec.rb @@ -6,7 +6,7 @@ # Create a test class that includes the concern let(:test_class) do Class.new(ApplicationJob) do - include ResqueJob + include JobTracker include UnitLevelJob def perform(entry_id, options = {}); end diff --git a/spec/jobs/document_gdoc_job_spec.rb b/spec/jobs/document_gdoc_job_spec.rb index 854f4ee..aad264e 100644 --- a/spec/jobs/document_gdoc_job_spec.rb +++ b/spec/jobs/document_gdoc_job_spec.rb @@ -12,8 +12,8 @@ expect(described_class.ancestors).to include(DocumentRescuableJob) end - it "includes ResqueJob" do - expect(described_class.ancestors).to include(ResqueJob) + it "includes JobTracker" do + expect(described_class.ancestors).to include(JobTracker) end end diff --git a/spec/jobs/document_pdf_job_spec.rb b/spec/jobs/document_pdf_job_spec.rb index a801f99..d3339f0 100644 --- a/spec/jobs/document_pdf_job_spec.rb +++ b/spec/jobs/document_pdf_job_spec.rb @@ -12,8 +12,8 @@ expect(described_class.ancestors).to include(DocumentRescuableJob) end - it "includes ResqueJob" do - expect(described_class.ancestors).to include(ResqueJob) + it "includes JobTracker" do + expect(described_class.ancestors).to include(JobTracker) end end diff --git a/spec/jobs/material_gdoc_job_spec.rb b/spec/jobs/material_gdoc_job_spec.rb index d823783..34aff4e 100644 --- a/spec/jobs/material_gdoc_job_spec.rb +++ b/spec/jobs/material_gdoc_job_spec.rb @@ -12,8 +12,8 @@ expect(described_class.ancestors).to include(MaterialRescuableJob) end - it "includes ResqueJob" do - expect(described_class.ancestors).to include(ResqueJob) + it "includes JobTracker" do + expect(described_class.ancestors).to include(JobTracker) end end diff --git a/spec/jobs/material_pdf_job_spec.rb b/spec/jobs/material_pdf_job_spec.rb index 2b1637b..abe0c12 100644 --- a/spec/jobs/material_pdf_job_spec.rb +++ b/spec/jobs/material_pdf_job_spec.rb @@ -12,8 +12,8 @@ expect(described_class.ancestors).to include(MaterialRescuableJob) end - it "includes ResqueJob" do - expect(described_class.ancestors).to include(ResqueJob) + it "includes JobTracker" do + expect(described_class.ancestors).to include(JobTracker) end end diff --git a/spec/lib/job_tracker_spec.rb b/spec/lib/job_tracker_spec.rb new file mode 100644 index 0000000..fa47aba --- /dev/null +++ b/spec/lib/job_tracker_spec.rb @@ -0,0 +1,105 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe JobTracker do + let(:test_class) do + Class.new(ApplicationJob) do + include JobTracker + queue_as :default + + def perform; end + end + end + + before { stub_const("TestTrackerJob", test_class) } + + describe ".status" do + let(:job_id) { SecureRandom.uuid } + + context "when job has a result in the database" do + before do + JobResult.create!(job_id: job_id, job_class: "TestTrackerJob", result: { ok: true }) + end + + it "returns :done" do + expect(TestTrackerJob.status(job_id)).to eq(:done) + end + end + + context "when job is unknown" do + it "returns :done" do + expect(TestTrackerJob.status(SecureRandom.uuid)).to eq(:done) + end + end + end + + describe ".fetch_result" do + let(:job_id) { SecureRandom.uuid } + + context "when result exists" do + before do + JobResult.create!(job_id: job_id, job_class: "TestTrackerJob", result: { "ok" => true, "link" => "https://example.com" }) + end + + it "returns the result hash" do + result = TestTrackerJob.fetch_result(job_id) + expect(result).to eq({ "ok" => true, "link" => "https://example.com" }) + end + end + + context "when result does not exist" do + it "returns nil" do + expect(TestTrackerJob.fetch_result(SecureRandom.uuid)).to be_nil + end + end + end + + describe ".result_key" do + it "returns a key with underscored class name and job_id" do + expect(TestTrackerJob.result_key("abc123")).to eq("test_tracker_job:abc123") + end + end + + describe "#store_result" do + let(:job) { TestTrackerJob.new } + + it "stores result in JobResult table" do + result = { ok: true, link: "https://example.com" } + job.store_result(result) + + stored = JobResult.find_by(job_id: job.job_id) + expect(stored).to be_present + expect(stored.result).to eq(result.stringify_keys) + end + + it "stores result with parent_job_id when initial_job_id is provided" do + result = { ok: true } + job.store_result(result, initial_job_id: "parent-123") + + stored = JobResult.find_by(job_id: job.job_id) + expect(stored.parent_job_id).to eq("parent-123") + end + end + + describe "#store_initial_result" do + let(:job) { TestTrackerJob.new } + + it "stores result keyed by job_id" do + result = { ok: true } + job.store_initial_result(result) + + stored = JobResult.find_by(job_id: job.job_id) + expect(stored).to be_present + expect(stored.result).to eq(result.stringify_keys) + end + + it "stores result keyed by initial_job_id when provided" do + result = { ok: true } + job.store_initial_result(result, initial_job_id: "initial-456") + + stored = JobResult.find_by(job_id: "initial-456") + expect(stored).to be_present + end + end +end diff --git a/spec/models/job_result_spec.rb b/spec/models/job_result_spec.rb new file mode 100644 index 0000000..68f2e81 --- /dev/null +++ b/spec/models/job_result_spec.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe JobResult do + describe "scopes" do + describe ".expired" do + it "returns records older than 1 hour" do + expired = described_class.create!(job_id: "expired-1", job_class: "TestJob", created_at: 2.hours.ago) + recent = described_class.create!(job_id: "recent-1", job_class: "TestJob", created_at: 30.minutes.ago) + + expect(described_class.expired).to include(expired) + expect(described_class.expired).not_to include(recent) + end + end + + describe ".for_parent" do + it "returns records with matching parent_job_id" do + parent = described_class.create!(job_id: "child-1", parent_job_id: "parent-1", job_class: "TestJob") + other = described_class.create!(job_id: "child-2", parent_job_id: "parent-2", job_class: "TestJob") + + expect(described_class.for_parent("parent-1")).to include(parent) + expect(described_class.for_parent("parent-1")).not_to include(other) + end + end + end + + describe ".cleanup_expired" do + it "deletes expired records" do + described_class.create!(job_id: "expired-1", job_class: "TestJob", created_at: 2.hours.ago) + described_class.create!(job_id: "recent-1", job_class: "TestJob", created_at: 30.minutes.ago) + + expect { described_class.cleanup_expired }.to change(described_class, :count).by(-1) + expect(described_class.find_by(job_id: "recent-1")).to be_present + end + end +end From 5117509fd54eb1058fab821b0a17359486e44a50 Mon Sep 17 00:00:00 2001 From: Sergei Alekseenko Date: Thu, 19 Mar 2026 23:54:34 +0400 Subject: [PATCH 2/3] refactoring - remove resque --- .env.docker | 8 +- .env.example | 8 +- Gemfile | 2 - Gemfile.lock | 52 +++------ app/jobs/concerns/nested_resque_job.rb | 56 ---------- config/initializers/resque.rb | 14 --- config/routes.rb | 3 +- config/solid_queue.yml | 10 +- db/queue_schema.rb | 129 ---------------------- db/schema.rb | 141 ++++++++++++++++++++++++- docker-compose.yml | 20 ---- lib/job_tracker.rb | 78 +------------- lib/resque_job.rb | 91 ---------------- lib/tasks/resque.rake | 20 ---- 14 files changed, 178 insertions(+), 454 deletions(-) delete mode 100644 app/jobs/concerns/nested_resque_job.rb delete mode 100644 config/initializers/resque.rb delete mode 100644 db/queue_schema.rb delete mode 100644 lib/resque_job.rb delete mode 100644 lib/tasks/resque.rake diff --git a/.env.docker b/.env.docker index 9483a90..460f514 100644 --- a/.env.docker +++ b/.env.docker @@ -30,8 +30,12 @@ REDIS_URL=redis://redis:6379/0 # Configuration for asynchronous job processing # ============================================================================== #BACKGROUND_JOBS= # Enable/disable background job processing -#RESQUE_NAMESPACE= # Namespace for Resque jobs -#WORKERS_COUNT= # Number of worker processes +#SOLID_QUEUE_IN_PUMA= # Run Solid Queue supervisor inside Puma (for single-server deploys) +#JOB_CONCURRENCY=2 # Number of Solid Queue worker processes +#JOB_THREADS=3 # Number of threads per worker process +#JOB_POLLING_INTERVAL=0.1 # Worker polling interval in seconds +#JOB_DISPATCHER_POLLING_INTERVAL=1 # Dispatcher polling interval in seconds +#JOB_DISPATCHER_BATCH_SIZE=500 # Number of jobs dispatched per batch # ============================================================================== # GOOGLE API INTEGRATION diff --git a/.env.example b/.env.example index e79e4bb..d9ad36c 100644 --- a/.env.example +++ b/.env.example @@ -33,8 +33,12 @@ AWS_S3_PREVIEW_FOLDER= # S3 folder for storing previews # Configuration for asynchronous job processing # ============================================================================== BACKGROUND_JOBS= # Enable/disable background job processing -RESQUE_NAMESPACE= # Namespace for Resque jobs -WORKERS_COUNT= # Number of worker processes +SOLID_QUEUE_IN_PUMA= # Run Solid Queue supervisor inside Puma (for single-server deploys) +JOB_CONCURRENCY= # Number of Solid Queue worker processes (default: 2) +JOB_THREADS= # Number of threads per worker process (default: 3) +JOB_POLLING_INTERVAL= # Worker polling interval in seconds (default: 0.1) +JOB_DISPATCHER_POLLING_INTERVAL= # Dispatcher polling interval in seconds (default: 1) +JOB_DISPATCHER_BATCH_SIZE= # Number of jobs dispatched per batch (default: 500) # ============================================================================== # GOOGLE API INTEGRATION diff --git a/Gemfile b/Gemfile index fbbca2a..7c74fd5 100644 --- a/Gemfile +++ b/Gemfile @@ -84,8 +84,6 @@ gem "devise", "~> 5.0" # Background Jobs & Queue gem "redis" gem "hiredis-client" -gem "resque" # Kept temporarily for draining in-flight jobs -gem "resque-scheduler", "~> 5.0" # Kept temporarily for draining in-flight jobs gem "activejob-retry", "~> 0.6.3" gem "concurrent-ruby", "~> 1.3" gem "mission_control-jobs" diff --git a/Gemfile.lock b/Gemfile.lock index 11d9db0..221a944 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -320,6 +320,10 @@ GEM image_processing (1.14.0) mini_magick (>= 4.9.5, < 6) ruby-vips (>= 2.0.17, < 3) + importmap-rails (2.2.3) + actionpack (>= 6.0.0) + activesupport (>= 6.0.0) + railties (>= 6.0.0) iniparse (1.5.0) io-console (0.8.2) irb (1.17.0) @@ -387,15 +391,22 @@ GEM mini_mime (1.1.5) minitest (6.0.1) prism (~> 1.5) + mission_control-jobs (1.1.0) + actioncable (>= 7.1) + actionpack (>= 7.1) + activejob (>= 7.1) + activerecord (>= 7.1) + importmap-rails (>= 1.2.1) + irb (~> 1.13) + railties (>= 7.1) + stimulus-rails + turbo-rails mock_redis (0.53.0) redis (~> 5) - mono_logger (1.1.2) msgpack (1.8.0) multi_json (1.19.1) multi_xml (0.8.1) bigdecimal (>= 3.1, < 5) - mustermann (3.0.4) - ruby2_keywords (~> 0.0.1) mutex_m (0.3.0) net-http (0.9.1) uri (>= 0.11.1) @@ -468,10 +479,6 @@ GEM rack (3.2.4) rack-mini-profiler (4.0.1) rack (>= 1.2.0) - rack-protection (4.2.1) - base64 (>= 0.1.0) - logger (>= 1.6.0) - rack (>= 3.0.0, < 4) rack-session (2.1.1) base64 (>= 0.1.0) rack (>= 3.0.0) @@ -534,8 +541,6 @@ GEM redis-client (>= 0.22.0) redis-client (0.26.4) connection_pool - redis-namespace (1.11.0) - redis (>= 4) regexp_parser (2.11.3) reline (0.6.3) io-console (~> 0.5) @@ -546,21 +551,6 @@ GEM responders (3.2.0) actionpack (>= 7.0) railties (>= 7.0) - resque (3.0.0) - base64 (~> 0.1) - logger - mono_logger (~> 1) - multi_json (~> 1.0) - redis (>= 4.0) - redis-namespace (~> 1.6) - sinatra (>= 2.0) - resque-scheduler (5.0.0) - base64 (~> 0.1) - logger - mono_logger (~> 1.0) - redis (>= 4.0) - resque (>= 3.0) - rufus-scheduler (~> 3.2, != 3.3) rest-client (2.1.0) http-accept (>= 1.7.0, < 2.0) http-cookie (>= 1.0.2, < 2.0) @@ -618,10 +608,7 @@ GEM ruby-vips (2.2.5) ffi (~> 1.12) logger - ruby2_keywords (0.0.5) rubyzip (2.4.1) - rufus-scheduler (3.9.2) - fugit (~> 1.1, >= 1.11.1) sanitize (7.0.0) crass (~> 1.0.2) nokogiri (>= 1.16.8) @@ -652,13 +639,6 @@ GEM simplecov_json_formatter (~> 0.1) simplecov-html (0.13.2) simplecov_json_formatter (0.1.4) - sinatra (4.2.1) - logger (>= 1.6.0) - mustermann (~> 3.0) - rack (>= 3.0.0, < 4) - rack-protection (= 4.2.1) - rack-session (>= 2.0.0, < 3) - tilt (~> 2.0) solid_cable (3.0.12) actioncable (>= 7.2) activejob (>= 7.2) @@ -720,7 +700,6 @@ GEM thruster (0.1.16-aarch64-linux) thruster (0.1.16-arm64-darwin) thruster (0.1.16-x86_64-linux) - tilt (2.7.0) timeout (0.6.0) traceroute (0.8.1) rails (>= 3.0.0) @@ -825,6 +804,7 @@ DEPENDENCIES lt-google-api (~> 0.4) lt-lcms (~> 0.7) mini_magick (~> 5.3) + mission_control-jobs mock_redis nokogiri (~> 1.19) oj (~> 3.16) @@ -840,8 +820,6 @@ DEPENDENCIES ransack (~> 4.2) rbs_rails redis - resque - resque-scheduler (~> 5.0) rest-client (~> 2.1, >= 2.1.0) retriable (~> 3.1) rspec-rails (~> 8.0) diff --git a/app/jobs/concerns/nested_resque_job.rb b/app/jobs/concerns/nested_resque_job.rb deleted file mode 100644 index 47ef93b..0000000 --- a/app/jobs/concerns/nested_resque_job.rb +++ /dev/null @@ -1,56 +0,0 @@ -# frozen_string_literal: true - -module NestedResqueJob - extend ActiveSupport::Concern - - class_methods do # rubocop:disable Metrics/BlockLength - def queued_or_running_nested?(job_id, current_job_id = "-1") - check_child = ->(j) { j["arguments"][1]&.dig("initial_job_id") == job_id && j["job_id"] != current_job_id } - job_klasses = self::NESTED_JOBS + [name] - job_klasses.each do |job_klass| - queued = find_in_queue_by_payload(job_klass, &check_child) || - find_in_working_by_payload(job_klass, &check_child) - return true if queued.present? - end - false - end - - def status_nested(jid) - self_status = status(jid) - return self_status unless self_status == :done - return :running if queued_or_running_nested?(jid) - - :done - end - - # Fetches the results of nested jobs in Resque. - # - # This method iterates over each job class in the `NESTED_JOBS` constant, - # constructs a Redis key pattern for the nested job results, and retrieves - # the value from Redis. It attempts to parse the value as JSON and adds it - # to the result array. If the parsing fails, it adds the original value - # to the result array instead. - # - # @param jid [String] The job ID of the parent job. - # @return [Array] An array containing the results of all nested jobs of the parent job. - # - # @example - # fetch_result_nested('1234') - def fetch_result_nested(jid) - [].tap do |result| - self::NESTED_JOBS.each do |job_klass| - Resque.redis.scan_each(match: "#{job_klass.constantize.result_key(jid)}*") do |key| - res = Resque.redis.get key - result << JSON.parse(res) rescue res - end - end - end - end - end - - private - - def initial_job_id - @initial_job_id ||= options[:initial_job_id].presence || job_id - end -end diff --git a/config/initializers/resque.rb b/config/initializers/resque.rb deleted file mode 100644 index 84d6538..0000000 --- a/config/initializers/resque.rb +++ /dev/null @@ -1,14 +0,0 @@ -# frozen_string_literal: true - -require "resque/server" - -Resque.redis = ENV.fetch("REDIS_URL", "redis://localhost:6379") -Resque.redis.namespace = ENV.fetch("RESQUE_NAMESPACE", "resque:development") - -project_id = ENV.fetch("AIR_BRAKE_PROJECT_ID", nil) -project_key = ENV.fetch("AIR_BRAKE_PROJECT_KEY", nil) - -if project_id.present? && project_key.present? - require "airbrake/resque" - Resque::Failure.backend = Resque::Failure::Airbrake -end diff --git a/config/routes.rb b/config/routes.rb index 1709ac9..fc685a3 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -22,10 +22,9 @@ registrations: "registrations" } - # Job monitoring dashboards (behind authentication) + # Job monitoring dashboard (behind authentication) authenticate :user do mount MissionControl::Jobs::Engine, at: "/jobs" - mount Resque::Server, at: "/queue" # Kept temporarily for draining monitoring end namespace :admin do diff --git a/config/solid_queue.yml b/config/solid_queue.yml index 9eace59..a0d10cf 100644 --- a/config/solid_queue.yml +++ b/config/solid_queue.yml @@ -1,12 +1,12 @@ default: &default dispatchers: - - polling_interval: 1 - batch_size: 500 + - polling_interval: <%= ENV.fetch("JOB_DISPATCHER_POLLING_INTERVAL", 1) %> + batch_size: <%= ENV.fetch("JOB_DISPATCHER_BATCH_SIZE", 500) %> workers: - queues: "*" - threads: 3 - processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %> - polling_interval: 0.1 + threads: <%= ENV.fetch("JOB_THREADS", 3) %> + processes: <%= ENV.fetch("JOB_CONCURRENCY", 2) %> + polling_interval: <%= ENV.fetch("JOB_POLLING_INTERVAL", 0.1) %> development: <<: *default diff --git a/db/queue_schema.rb b/db/queue_schema.rb deleted file mode 100644 index 85194b6..0000000 --- a/db/queue_schema.rb +++ /dev/null @@ -1,129 +0,0 @@ -ActiveRecord::Schema[7.1].define(version: 1) do - create_table "solid_queue_blocked_executions", force: :cascade do |t| - t.bigint "job_id", null: false - t.string "queue_name", null: false - t.integer "priority", default: 0, null: false - t.string "concurrency_key", null: false - t.datetime "expires_at", null: false - t.datetime "created_at", null: false - t.index [ "concurrency_key", "priority", "job_id" ], name: "index_solid_queue_blocked_executions_for_release" - t.index [ "expires_at", "concurrency_key" ], name: "index_solid_queue_blocked_executions_for_maintenance" - t.index [ "job_id" ], name: "index_solid_queue_blocked_executions_on_job_id", unique: true - end - - create_table "solid_queue_claimed_executions", force: :cascade do |t| - t.bigint "job_id", null: false - t.bigint "process_id" - t.datetime "created_at", null: false - t.index [ "job_id" ], name: "index_solid_queue_claimed_executions_on_job_id", unique: true - t.index [ "process_id", "job_id" ], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" - end - - create_table "solid_queue_failed_executions", force: :cascade do |t| - t.bigint "job_id", null: false - t.text "error" - t.datetime "created_at", null: false - t.index [ "job_id" ], name: "index_solid_queue_failed_executions_on_job_id", unique: true - end - - create_table "solid_queue_jobs", force: :cascade do |t| - t.string "queue_name", null: false - t.string "class_name", null: false - t.text "arguments" - t.integer "priority", default: 0, null: false - t.string "active_job_id" - t.datetime "scheduled_at" - t.datetime "finished_at" - t.string "concurrency_key" - t.datetime "created_at", null: false - t.datetime "updated_at", null: false - t.index [ "active_job_id" ], name: "index_solid_queue_jobs_on_active_job_id" - t.index [ "class_name" ], name: "index_solid_queue_jobs_on_class_name" - t.index [ "finished_at" ], name: "index_solid_queue_jobs_on_finished_at" - t.index [ "queue_name", "finished_at" ], name: "index_solid_queue_jobs_for_filtering" - t.index [ "scheduled_at", "finished_at" ], name: "index_solid_queue_jobs_for_alerting" - end - - create_table "solid_queue_pauses", force: :cascade do |t| - t.string "queue_name", null: false - t.datetime "created_at", null: false - t.index [ "queue_name" ], name: "index_solid_queue_pauses_on_queue_name", unique: true - end - - create_table "solid_queue_processes", force: :cascade do |t| - t.string "kind", null: false - t.datetime "last_heartbeat_at", null: false - t.bigint "supervisor_id" - t.integer "pid", null: false - t.string "hostname" - t.text "metadata" - t.datetime "created_at", null: false - t.string "name", null: false - t.index [ "last_heartbeat_at" ], name: "index_solid_queue_processes_on_last_heartbeat_at" - t.index [ "name", "supervisor_id" ], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true - t.index [ "supervisor_id" ], name: "index_solid_queue_processes_on_supervisor_id" - end - - create_table "solid_queue_ready_executions", force: :cascade do |t| - t.bigint "job_id", null: false - t.string "queue_name", null: false - t.integer "priority", default: 0, null: false - t.datetime "created_at", null: false - t.index [ "job_id" ], name: "index_solid_queue_ready_executions_on_job_id", unique: true - t.index [ "priority", "job_id" ], name: "index_solid_queue_poll_all" - t.index [ "queue_name", "priority", "job_id" ], name: "index_solid_queue_poll_by_queue" - end - - create_table "solid_queue_recurring_executions", force: :cascade do |t| - t.bigint "job_id", null: false - t.string "task_key", null: false - t.datetime "run_at", null: false - t.datetime "created_at", null: false - t.index [ "job_id" ], name: "index_solid_queue_recurring_executions_on_job_id", unique: true - t.index [ "task_key", "run_at" ], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true - end - - create_table "solid_queue_recurring_tasks", force: :cascade do |t| - t.string "key", null: false - t.string "schedule", null: false - t.string "command", limit: 2048 - t.string "class_name" - t.text "arguments" - t.string "queue_name" - t.integer "priority", default: 0 - t.boolean "static", default: true, null: false - t.text "description" - t.datetime "created_at", null: false - t.datetime "updated_at", null: false - t.index [ "key" ], name: "index_solid_queue_recurring_tasks_on_key", unique: true - t.index [ "static" ], name: "index_solid_queue_recurring_tasks_on_static" - end - - create_table "solid_queue_scheduled_executions", force: :cascade do |t| - t.bigint "job_id", null: false - t.string "queue_name", null: false - t.integer "priority", default: 0, null: false - t.datetime "scheduled_at", null: false - t.datetime "created_at", null: false - t.index [ "job_id" ], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true - t.index [ "scheduled_at", "priority", "job_id" ], name: "index_solid_queue_dispatch_all" - end - - create_table "solid_queue_semaphores", force: :cascade do |t| - t.string "key", null: false - t.integer "value", default: 1, null: false - t.datetime "expires_at", null: false - t.datetime "created_at", null: false - t.datetime "updated_at", null: false - t.index [ "expires_at" ], name: "index_solid_queue_semaphores_on_expires_at" - t.index [ "key", "value" ], name: "index_solid_queue_semaphores_on_key_and_value" - t.index [ "key" ], name: "index_solid_queue_semaphores_on_key", unique: true - end - - add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade -end diff --git a/db/schema.rb b/db/schema.rb index 3514c17..bf0edbe 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[8.1].define(version: 2026_02_19_081358) do +ActiveRecord::Schema[8.1].define(version: 2026_03_19_000002) do # These are extensions that must be enabled in order to support this database enable_extension "hstore" enable_extension "pg_catalog.plpgsql" @@ -84,6 +84,18 @@ t.index ["resource_id"], name: "index_documents_on_resource_id" end + create_table "job_results", force: :cascade do |t| + t.datetime "created_at", null: false + t.string "job_class", null: false + t.string "job_id", null: false + t.string "parent_job_id" + t.jsonb "result", default: {} + t.datetime "updated_at", null: false + t.index ["job_id"], name: "index_job_results_on_job_id", unique: true + t.index ["parent_job_id", "job_class"], name: "index_job_results_on_parent_job_id_and_job_class" + t.index ["parent_job_id"], name: "index_job_results_on_parent_job_id" + end + create_table "lcms_engine_integrations_webhook_configurations", force: :cascade do |t| t.string "action", default: "post", null: false t.boolean "active", default: true @@ -181,6 +193,127 @@ t.index ["key"], name: "index_settings_on_key", unique: true end + create_table "solid_queue_blocked_executions", force: :cascade do |t| + t.string "concurrency_key", null: false + t.datetime "created_at", null: false + t.datetime "expires_at", null: false + t.bigint "job_id", null: false + t.integer "priority", default: 0, null: false + t.string "queue_name", null: false + t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" + t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" + t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true + end + + create_table "solid_queue_claimed_executions", force: :cascade do |t| + t.datetime "created_at", null: false + t.bigint "job_id", null: false + t.bigint "process_id" + t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true + t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + end + + create_table "solid_queue_failed_executions", force: :cascade do |t| + t.datetime "created_at", null: false + t.text "error" + t.bigint "job_id", null: false + t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true + end + + create_table "solid_queue_jobs", force: :cascade do |t| + t.string "active_job_id" + t.text "arguments" + t.string "class_name", null: false + t.string "concurrency_key" + t.datetime "created_at", null: false + t.datetime "finished_at" + t.integer "priority", default: 0, null: false + t.string "queue_name", null: false + t.datetime "scheduled_at" + t.datetime "updated_at", null: false + t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" + t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" + t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" + t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" + t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" + end + + create_table "solid_queue_pauses", force: :cascade do |t| + t.datetime "created_at", null: false + t.string "queue_name", null: false + t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true + end + + create_table "solid_queue_processes", force: :cascade do |t| + t.datetime "created_at", null: false + t.string "hostname" + t.string "kind", null: false + t.datetime "last_heartbeat_at", null: false + t.text "metadata" + t.string "name", null: false + t.integer "pid", null: false + t.bigint "supervisor_id" + t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" + t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" + end + + create_table "solid_queue_ready_executions", force: :cascade do |t| + t.datetime "created_at", null: false + t.bigint "job_id", null: false + t.integer "priority", default: 0, null: false + t.string "queue_name", null: false + t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true + t.index ["priority", "job_id"], name: "index_solid_queue_poll_all" + t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue" + end + + create_table "solid_queue_recurring_executions", force: :cascade do |t| + t.datetime "created_at", null: false + t.bigint "job_id", null: false + t.datetime "run_at", null: false + t.string "task_key", null: false + t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true + t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true + end + + create_table "solid_queue_recurring_tasks", force: :cascade do |t| + t.text "arguments" + t.string "class_name" + t.string "command", limit: 2048 + t.datetime "created_at", null: false + t.text "description" + t.string "key", null: false + t.integer "priority", default: 0 + t.string "queue_name" + t.string "schedule", null: false + t.boolean "static", default: true, null: false + t.datetime "updated_at", null: false + t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true + t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" + end + + create_table "solid_queue_scheduled_executions", force: :cascade do |t| + t.datetime "created_at", null: false + t.bigint "job_id", null: false + t.integer "priority", default: 0, null: false + t.string "queue_name", null: false + t.datetime "scheduled_at", null: false + t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true + t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all" + end + + create_table "solid_queue_semaphores", force: :cascade do |t| + t.datetime "created_at", null: false + t.datetime "expires_at", null: false + t.string "key", null: false + t.datetime "updated_at", null: false + t.integer "value", default: 1, null: false + t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" + t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value" + t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true + end + create_table "standards", id: :serial, force: :cascade do |t| t.text "alt_names", default: [], null: false, array: true t.string "course" @@ -245,4 +378,10 @@ add_foreign_key "resource_standards", "resources" add_foreign_key "resource_standards", "standards" + add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade end diff --git a/docker-compose.yml b/docker-compose.yml index 4a11e3d..784c9ea 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -50,26 +50,6 @@ services: - .env.docker - .env.development - # Keep resque temporarily to drain in-flight jobs during migration - resque: - image: lcms-core:dev - command: | - bash -c "bundle install && env QUEUES=* COUNT=${RESQUE_COUNT:-1} bundle exec rake resque:workers" - volumes: - - .:/app - - bundle:/usr/local/bundle - depends_on: - - db - - redis - env_file: - - .env.docker - - .env.development - environment: - - TERM_CHILD=1 - - RESQUE_TERM_TIMEOUT=10 - profiles: - - resque-drain - css: image: lcms-core:dev command: | diff --git a/lib/job_tracker.rb b/lib/job_tracker.rb index e5dd07a..f74b636 100644 --- a/lib/job_tracker.rb +++ b/lib/job_tracker.rb @@ -12,9 +12,7 @@ def find(job_id) def find_in_queue(job_id) sq_job = SolidQueue::Job.find_by(active_job_id: job_id) - return sq_job if sq_job&.ready? - - resque_find_in_queue(job_id) + sq_job if sq_job&.ready_execution&.present? end def find_in_queue_by_payload(job_class, &block) @@ -23,9 +21,6 @@ def find_in_queue_by_payload(job_class, &block) .where.not(id: SolidQueue::ClaimedExecution.select(:job_id)) .flat_map { |j| Array.wrap(j.arguments) } - # Also check Resque for draining jobs - jobs += resque_find_in_queue_by_payload(job_class) - return jobs unless block_given? jobs.detect(&block) @@ -33,9 +28,7 @@ def find_in_queue_by_payload(job_class, &block) def find_in_working(job_id) sq_job = SolidQueue::Job.find_by(active_job_id: job_id) - return sq_job if sq_job&.claimed? - - resque_find_in_working(job_id) + sq_job if sq_job&.claimed_execution&.present? end def find_in_working_by_payload(job_class, &block) @@ -44,9 +37,6 @@ def find_in_working_by_payload(job_class, &block) .joins(:claimed_execution) .flat_map { |j| Array.wrap(j.arguments) } - # Also check Resque for draining jobs - jobs += resque_find_in_working_by_payload(job_class) - return jobs unless block_given? jobs.detect(&block) @@ -62,77 +52,19 @@ def result_key(job_id) end def status(job_id) - # Check Solid Queue first sq_job = SolidQueue::Job.find_by(active_job_id: job_id) if sq_job - return :waiting if sq_job.ready? - return :running if sq_job.claimed? + return :waiting if sq_job.ready_execution.present? + return :running if sq_job.claimed_execution.present? return :done if sq_job.finished? - return :done if sq_job.failed? + return :done if sq_job.failed_execution.present? end # Check if result exists (job already finished and was cleared) return :done if JobResult.exists?(job_id: job_id) - # Fallback: check Resque for draining jobs - if resque_available? - return :waiting if resque_find_in_queue(job_id) - return :running if resque_find_in_working(job_id) - end - :done end - - private - - def resque_available? - defined?(Resque) && (Resque.redis.connected? rescue false) - end - - def resque_find_in_queue(job_id) - return nil unless resque_available? - - Resque.peek(queue_name, 0, 0) - .map { |job| job["args"].first } - .detect { |job| job["job_id"] == job_id } - rescue StandardError - nil - end - - def resque_find_in_queue_by_payload(job_class) - return [] unless resque_available? - - jobs = Array.wrap Resque.peek(queue_name, 0, 0) - jobs - .select { |j| j["args"].first["job_class"] == job_class.to_s } - .flat_map { |j| j["args"] } - rescue StandardError - [] - end - - def resque_find_in_working(job_id) - return nil unless resque_available? - - Resque::Worker.working.map(&:job).detect do |job| - if job.is_a?(Hash) && (args = job.dig "payload", "args").is_a?(Array) - args.detect { |x| x["job_id"] == job_id } - end - end - rescue StandardError - nil - end - - def resque_find_in_working_by_payload(job_class) - return [] unless resque_available? - - Resque::Worker.working.map(&:job).flat_map do |job| - next unless job.is_a?(Hash) && (args = job.dig "payload", "args").is_a?(Array) - - args.select { |x| x["job_class"] == job_class.to_s } - end.compact - rescue StandardError - [] - end end def result_key diff --git a/lib/resque_job.rb b/lib/resque_job.rb deleted file mode 100644 index 579b884..0000000 --- a/lib/resque_job.rb +++ /dev/null @@ -1,91 +0,0 @@ -# frozen_string_literal: true - -module ResqueJob - def self.included(base) # :nodoc: - base.extend ClassMethods - end - - module ClassMethods - def find(job_id) - find_in_queue(job_id) || find_in_working(job_id) - end - - def find_in_queue(job_id) - Resque.peek(queue_name, 0, 0) - .map { |job| job["args"].first } - .detect { |job| job["job_id"] == job_id } - end - - def find_in_queue_by_payload(job_class, &) - jobs = Array.wrap Resque.peek(queue_name, 0, 0) - result = jobs - .select { |j| j["args"].first["job_class"] == job_class.to_s } - .flat_map { |j| j["args"] } - return result unless block_given? - - result.detect(&) - end - - def find_in_working(job_id) - Resque::Worker.working.map(&:job).detect do |job| - if job.is_a?(Hash) && (args = job.dig "payload", "args").is_a?(Array) - args.detect { |x| x["job_id"] == job_id } - end - end - end - - def find_in_working_by_payload(job_class, &) - result = - Resque::Worker.working.map(&:job).flat_map do |job| - next unless job.is_a?(Hash) && (args = job.dig "payload", "args").is_a?(Array) - - args.select { |x| x["job_class"] == job_class.to_s } - end.compact - return result unless block_given? - - result.detect(&) - end - - def fetch_result(job_id) - res = Resque.redis.get result_key(job_id) - JSON.parse(res) rescue res - end - - def result_key(job_id) - [Resque.redis.namespace, "result", name.underscore, job_id].join(":") - end - - def status(job_id) - if find_in_queue(job_id) - :waiting - elsif find_in_working(job_id) - :running - else - :done - end - end - end - - def result_key - @result_key ||= self.class.result_key(job_id) - end - - def store_initial_result(res, options = {}) - key = self.class.result_key(options[:initial_job_id].presence || job_id) - Resque.redis.set(key, res.to_json, ex: 1.hour.to_i) - end - - # - # @param [Hash] res - # @param [Hash] options - # - def store_result(res, options = {}) - key = if (jid = options[:initial_job_id]).blank? - result_key - else - # store result with parent job id to retrieve the result later knowing only parent job id - [Resque.redis.namespace, "result", self.class.name.to_s.underscore, jid, job_id].join(":") - end - Resque.redis.set(key, res.to_json, ex: 1.hour.to_i) - end -end diff --git a/lib/tasks/resque.rake b/lib/tasks/resque.rake deleted file mode 100644 index e688f55..0000000 --- a/lib/tasks/resque.rake +++ /dev/null @@ -1,20 +0,0 @@ -# frozen_string_literal: true - -require "resque/tasks" - -namespace :resque do - task setup: :environment do - ENV["QUEUE"] ||= "default" - - # Solution for the "prepared statements" on ActiveJob issue: - # Error while trying to deserialize arguments: PG::DuplicatePstatement: ERROR: prepared - # statement "a1" already exists - # - # Some references on this here: - # https://github.com/rails/rails/pull/17607 - # https://github.com/rails/rails/pull/25827 - Resque.before_fork do - ActiveRecord::Base.establish_connection - end - end -end From e37fd054b9222f2c41a9601bfbf4495454be6830 Mon Sep 17 00:00:00 2001 From: Sergei Alekseenko Date: Sat, 21 Mar 2026 00:40:44 +0400 Subject: [PATCH 3/3] refactor retry strategy for solid_queue --- CLAUDE.md | 2 +- Gemfile | 1 - Gemfile.lock | 4 -- MIGRATION_PLAN.md | 19 ++---- app/jobs/concerns/document_rescuable_job.rb | 4 +- app/jobs/concerns/material_rescuable_job.rb | 2 +- app/jobs/concerns/retry_delayed.rb | 41 +++++++----- app/jobs/concerns/retry_simple.rb | 11 ---- spec/jobs/concerns/retry_delayed_spec.rb | 73 +++++++++++++++++++++ 9 files changed, 107 insertions(+), 50 deletions(-) delete mode 100644 app/jobs/concerns/retry_simple.rb create mode 100644 spec/jobs/concerns/retry_delayed_spec.rb diff --git a/CLAUDE.md b/CLAUDE.md index 9f4a5a6..7cf6cb3 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -233,7 +233,7 @@ Jobs are in `app/jobs/` and use Resque with ActiveJob: - `MaterialGeneratePdfJob`: PDF generation - `MaterialGenerateGdocJob`: Google Doc generation -All jobs inherit from `ApplicationJob` with retry logic via `activejob-retry`. +All jobs inherit from `ApplicationJob` with retry logic via Rails' built-in `retry_on`. ### Template System (lib/doc_template) diff --git a/Gemfile b/Gemfile index 7c74fd5..1a8086b 100644 --- a/Gemfile +++ b/Gemfile @@ -84,7 +84,6 @@ gem "devise", "~> 5.0" # Background Jobs & Queue gem "redis" gem "hiredis-client" -gem "activejob-retry", "~> 0.6.3" gem "concurrent-ruby", "~> 1.3" gem "mission_control-jobs" diff --git a/Gemfile.lock b/Gemfile.lock index 221a944..d3248ce 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -55,9 +55,6 @@ GEM activejob (8.1.1) activesupport (= 8.1.1) globalid (>= 0.3.6) - activejob-retry (0.6.3) - activejob (>= 4.2) - activesupport (>= 4.2) activemodel (8.1.1) activesupport (= 8.1.1) activerecord (8.1.1) @@ -762,7 +759,6 @@ PLATFORMS DEPENDENCIES active_model_serializers (~> 0.10.16) - activejob-retry (~> 0.6.3) acts-as-taggable-on (~> 13.0) acts_as_list (~> 1.0) airbrake (~> 13.0) diff --git a/MIGRATION_PLAN.md b/MIGRATION_PLAN.md index d10f303..0920260 100644 --- a/MIGRATION_PLAN.md +++ b/MIGRATION_PLAN.md @@ -294,9 +294,10 @@ TO: app/controllers/*.rb #### 5.2. Job Concerns - [x] Migrate concerns from `app/jobs/concerns/` - - NestedResqueJob - - RetryDelayed - - RetrySimple + - NestedResqueJob → NestedJobTracker (uses JobResult DB table instead of Redis) + - RetryDelayed → rewritten with native Rails `retry_on` / `discard_on` + - RetrySimple → removed (unused) + - Removed `activejob-retry` gem dependency #### 5.3. Queue Configuration - [x] Decide: Resque vs Solid Queue (chose Resque) @@ -305,17 +306,7 @@ TO: app/controllers/*.rb - [x] Migrate lib/resque_job.rb - [x] Migrate lib/tasks/resque.rake -**Option A: Keep Resque** -```ruby -# config/application.rb -config.active_job.queue_adapter = :resque - -# lib/tasks/resque.rake -require 'resque/tasks' -require 'resque/scheduler/tasks' -``` - -**Option B: Migrate to Solid Queue (Rails 8.1)** +**Background Queue with Solid Queue (Rails 8.1)** ```ruby # config/application.rb config.active_job.queue_adapter = :solid_queue diff --git a/app/jobs/concerns/document_rescuable_job.rb b/app/jobs/concerns/document_rescuable_job.rb index d47778e..86e21ea 100644 --- a/app/jobs/concerns/document_rescuable_job.rb +++ b/app/jobs/concerns/document_rescuable_job.rb @@ -11,9 +11,9 @@ module DocumentRescuableJob options = (arguments[1] || {}).with_indifferent_access unless options[:preview] document = Document.find(document_id) - document.reload.with_lock do + document.with_lock do data = document.links[self.class::LINK_KEY]&.slice("preview") || {} - document.update_columns(links: document.reload.links.merge(self.class::LINK_KEY => data)) + document.update_columns(links: document.links.merge(self.class::LINK_KEY => data)) end store_result({ ok: false, link: document_path(document_id), diff --git a/app/jobs/concerns/material_rescuable_job.rb b/app/jobs/concerns/material_rescuable_job.rb index 700cac0..0b053d2 100644 --- a/app/jobs/concerns/material_rescuable_job.rb +++ b/app/jobs/concerns/material_rescuable_job.rb @@ -11,7 +11,7 @@ module MaterialRescuableJob material = MaterialPresenter.new(material_id) options = (arguments[1] || {}).with_indifferent_access unless options[:preview] - material.reload.with_lock do + material.with_lock do material.update_columns(links: material.links.merge(self.class::LINK_KEY => {})) end store_result({ ok: false, diff --git a/app/jobs/concerns/retry_delayed.rb b/app/jobs/concerns/retry_delayed.rb index b20a0e6..89c5a71 100644 --- a/app/jobs/concerns/retry_delayed.rb +++ b/app/jobs/concerns/retry_delayed.rb @@ -1,27 +1,36 @@ # frozen_string_literal: true -require "activejob/retry" - module RetryDelayed extend ActiveSupport::Concern - module RetryBackoffStrategy - MIN_DELAY_MULTIPLIER = 1.0 - MAX_DELAY_MULTIPLIER = 5.0 - RETRY_DELAYES = [30.seconds, 1.minute, 3.minutes, 7.minutes].freeze + RETRY_DELAYS = [30.seconds, 1.minute, 3.minutes, 7.minutes].freeze + MIN_DELAY_MULTIPLIER = 1.0 + MAX_DELAY_MULTIPLIER = 5.0 - def self.should_retry?(retry_attempt, exception) - return false if exception.message =~ /Script error message/ && exception.message =~ /PAGE_BREAK/ + # Google Apps Script PAGE_BREAK errors are non-recoverable + class NonRecoverableScriptError < StandardError; end - retry_attempt <= RETRY_DELAYES.size - end + included do + # Order matters: rescue_from handlers are checked in reverse declaration order. + # retry_on is declared first (checked second), discard_on second (checked first). + retry_on StandardError, + attempts: RETRY_DELAYS.size + 1, + wait: ->(executions) { + delay = RETRY_DELAYS[executions - 1] || RETRY_DELAYS.last + delay * rand(MIN_DELAY_MULTIPLIER..MAX_DELAY_MULTIPLIER) + } - def self.retry_delay(retry_attempt, _exception) - (RETRY_DELAYES[retry_attempt] || 0) * rand(MIN_DELAY_MULTIPLIER..MAX_DELAY_MULTIPLIER) - end - end + discard_on NonRecoverableScriptError - included do - include ::ActiveJob::Retry.new(strategy: RetryBackoffStrategy) + # Convert PAGE_BREAK script errors to NonRecoverableScriptError so discard_on can catch them + around_perform do |_job, block| + block.call + rescue StandardError => e + if e.message =~ /Script error message/ && e.message =~ /PAGE_BREAK/ + raise NonRecoverableScriptError, e.message + end + + raise + end end end diff --git a/app/jobs/concerns/retry_simple.rb b/app/jobs/concerns/retry_simple.rb deleted file mode 100644 index 2a17c3e..0000000 --- a/app/jobs/concerns/retry_simple.rb +++ /dev/null @@ -1,11 +0,0 @@ -# frozen_string_literal: true - -require "activejob/retry" - -module RetrySimple - extend ActiveSupport::Concern - - included do - include ActiveJob::Retry.new(strategy: :constant, limit: 3) - end -end diff --git a/spec/jobs/concerns/retry_delayed_spec.rb b/spec/jobs/concerns/retry_delayed_spec.rb new file mode 100644 index 0000000..2437a4b --- /dev/null +++ b/spec/jobs/concerns/retry_delayed_spec.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +require "rails_helper" + +RSpec.describe RetryDelayed do + include ActiveJob::TestHelper + + let(:test_class) do + Class.new(ApplicationJob) do + include RetryDelayed + + queue_as :default + + def perform(should_fail: false, error_message: "something went wrong") + raise StandardError, error_message if should_fail + end + end + end + + around do |example| + original_adapter = ActiveJob::Base.queue_adapter + ActiveJob::Base.queue_adapter = :test + example.run + ensure + ActiveJob::Base.queue_adapter = original_adapter + end + + before { stub_const("TestRetryDelayedJob", test_class) } + + describe "retry on StandardError" do + it "retries up to #{RetryDelayed::RETRY_DELAYS.size} times" do + perform_enqueued_jobs(only: TestRetryDelayedJob) do + TestRetryDelayedJob.perform_later(should_fail: true) + rescue StandardError + # Expected after all retries exhausted + end + + # 1 original + 4 retries = 5 attempts + expect(performed_jobs.count { |j| j["job_class"] == "TestRetryDelayedJob" }).to eq(5) + end + end + + describe "PAGE_BREAK script errors are discarded" do + it "does not retry when message contains both 'Script error message' and 'PAGE_BREAK'" do + error_message = "Script error message: PAGE_BREAK not allowed" + + perform_enqueued_jobs(only: TestRetryDelayedJob) do + TestRetryDelayedJob.perform_later(should_fail: true, error_message: error_message) + end + + # Discarded after 1 attempt — no retries + expect(performed_jobs.count { |j| j["job_class"] == "TestRetryDelayedJob" }).to eq(1) + end + + it "still retries when only 'PAGE_BREAK' is present without 'Script error message'" do + error_message = "PAGE_BREAK encountered" + + perform_enqueued_jobs(only: TestRetryDelayedJob) do + TestRetryDelayedJob.perform_later(should_fail: true, error_message: error_message) + rescue StandardError + # Expected after retries exhausted + end + + expect(performed_jobs.count { |j| j["job_class"] == "TestRetryDelayedJob" }).to eq(5) + end + end + + describe "delay calculation" do + it "uses escalating delays" do + expect(RetryDelayed::RETRY_DELAYS).to eq([30.seconds, 1.minute, 3.minutes, 7.minutes]) + end + end +end