From d6ba2c962188c09eaecbe34adf602e0dac3ddfec Mon Sep 17 00:00:00 2001 From: fatkodima Date: Wed, 2 Nov 2022 02:32:53 +0200 Subject: [PATCH] Support nested iteration Co-authored-by: Bart de Water <496367+bdewater@users.noreply.github.com> --- CHANGELOG.md | 1 + README.md | 23 ++++++- lib/job-iteration/enumerator_builder.rb | 36 +++++++++++ lib/job-iteration/iteration.rb | 2 +- lib/job-iteration/nested_enumerator.rb | 46 ++++++++++++++ test/test_helper.rb | 44 ++++++++++--- test/unit/enumerator_builder_test.rb | 14 +++++ test/unit/iteration_test.rb | 44 +++++++++++++ test/unit/nested_enumerator_test.rb | 82 +++++++++++++++++++++++++ 9 files changed, 279 insertions(+), 13 deletions(-) create mode 100644 lib/job-iteration/nested_enumerator.rb create mode 100644 test/unit/nested_enumerator_test.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index ce474a92..fb424f37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support - [240](https://github.com/Shopify/job-iteration/pull/240) - Allow setting inheritable per-job `job_iteration_max_job_runtime` - [289](https://github.com/Shopify/job-iteration/pull/289) - Fix uninitialized constant error when raising `ConditionNotSupportedError` from `ActiveRecordBatchEnumerator` +- [310](https://github.com/Shopify/job-iteration/pull/310) - Support nested iteration - [338](https://github.com/Shopify/job-iteration/pull/338) - All logs are now `ActiveSupport::Notifications` events and logged using `ActiveSupport::LogSubscriber` to allow customization. Events now always include the `cursor_position` tag. - [341](https://github.com/Shopify/job-iteration/pull/341) - Add `JobIteration.default_retry_backoff`, which sets a default delay when jobs are re-enqueued after being interrupted. Defaults to `nil`, meaning no delay, which matches the current behaviour. diff --git a/README.md b/README.md index 6cb98d9d..306b375f 100644 --- a/README.md +++ b/README.md @@ -130,6 +130,27 @@ class CsvJob < ApplicationJob end ``` +```ruby +class NestedIterationJob < ApplicationJob + include JobIteration::Iteration + + def build_enumerator(cursor:) + enumerator_builder.nested( + [ + ->(cursor) { enumerator_builder.active_record_on_records(Shop.all, cursor: cursor) }, + ->(shop, cursor) { enumerator_builder.active_record_on_records(shop.products, cursor: cursor) }, + ->(_shop, product, cursor) { enumerator_builder.active_record_on_batch_relations(product.product_variants, cursor: cursor) } + ], + cursor: cursor + ) + end + + def each_iteration(product_variants_relation) + # do something + end +end +``` + Iteration hooks into Sidekiq and Resque out of the box to support graceful interruption. No extra configuration is required. ## Guides @@ -169,8 +190,6 @@ There a few configuration assumptions that are required for Iteration to work wi **Why is it important that `each_iteration` takes less than 30 seconds?** When the job worker is scheduled for restart or shutdown, it gets a notice to finish remaining unit of work. To guarantee that no progress is lost we need to make sure that `each_iteration` completes within a reasonable amount of time. -**What do I do if each iteration takes a long time, because it's doing nested operations?** If your `each_iteration` is complex, we recommend enqueuing another job, which will run your nested business logic. We may expose primitives in the future to do this more effectively, but this is not terribly common today. - **Why do I use have to use this ugly helper in `build_enumerator`? Why can't you automatically infer it?** This is how the first version of the API worked. We checked the type of object returned by `build_enumerable`, and whether it was ActiveRecord Relation or an Array, we used the matching adapter. This caused opaque type branching in Iteration internals and it didn’t allow developers to craft their own Enumerators and control the cursor value. We made a decision to _always_ return Enumerator instance from `build_enumerator`. Now we provide explicit helpers to convert ActiveRecord Relation or an Array to Enumerator, and for more complex iteration flows developers can build their own `Enumerator` objects. **What is the difference between Enumerable and Enumerator?** We recomend [this post](http://blog.arkency.com/2014/01/ruby-to-enum-for-enumerator/) to learn more about Enumerators in Ruby. diff --git a/lib/job-iteration/enumerator_builder.rb b/lib/job-iteration/enumerator_builder.rb index 7587a9a1..46e2756d 100644 --- a/lib/job-iteration/enumerator_builder.rb +++ b/lib/job-iteration/enumerator_builder.rb @@ -4,6 +4,7 @@ require_relative "./active_record_enumerator" require_relative "./csv_enumerator" require_relative "./throttle_enumerator" +require_relative "./nested_enumerator" require "forwardable" module JobIteration @@ -146,6 +147,40 @@ def build_csv_enumerator(enumerable, cursor:) CsvEnumerator.new(enumerable).rows(cursor: cursor) end + # Builds Enumerator for nested iteration. + # + # @param enums [Array] an Array of Procs, each should return an Enumerator. + # Each proc from enums should accept the yielded items from the parent enumerators + # and the `cursor` as its arguments. + # Each proc's `cursor` argument is its part from the `build_enumerator`'s `cursor` array. + # @param cursor [Array] array of offsets for each of the enums to start iteration from + # + # @example + # def build_enumerator(cursor:) + # enumerator_builder.nested( + # [ + # ->(cursor) { + # enumerator_builder.active_record_on_records(Shop.all, cursor: cursor) + # }, + # ->(shop, cursor) { + # enumerator_builder.active_record_on_records(shop.products, cursor: cursor) + # }, + # ->(_shop, product, cursor) { + # enumerator_builder.active_record_on_batch_relations(product.product_variants, cursor: cursor) + # } + # ], + # cursor: cursor + # ) + # end + # + # def each_iteration(product_variants_relation) + # # do something + # end + # + def build_nested_enumerator(enums, cursor:) + NestedEnumerator.new(enums, cursor: cursor).each + end + alias_method :once, :build_once_enumerator alias_method :times, :build_times_enumerator alias_method :array, :build_array_enumerator @@ -154,6 +189,7 @@ def build_csv_enumerator(enumerable, cursor:) alias_method :active_record_on_batch_relations, :build_active_record_enumerator_on_batch_relations alias_method :throttle, :build_throttle_enumerator alias_method :csv, :build_csv_enumerator + alias_method :nested, :build_nested_enumerator private diff --git a/lib/job-iteration/iteration.rb b/lib/job-iteration/iteration.rb index acccdc6b..fb9ebf47 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job-iteration/iteration.rb @@ -189,7 +189,7 @@ def iterate_with_enumerator(enumerator, arguments) # Deferred until 2.0.0 # assert_valid_cursor!(cursor_from_enumerator) - tags = instrumentation_tags.merge(cursor_position: cursor_from_enumerator) + tags = instrumentation_tags.merge(cursor_position: cursor_from_enumerator.dup) ActiveSupport::Notifications.instrument("each_iteration.iteration", tags) do found_record = true each_iteration(object_from_enumerator, *arguments) diff --git a/lib/job-iteration/nested_enumerator.rb b/lib/job-iteration/nested_enumerator.rb new file mode 100644 index 00000000..5aab9d7f --- /dev/null +++ b/lib/job-iteration/nested_enumerator.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module JobIteration + # @private + class NestedEnumerator + def initialize(enums, cursor: nil) + unless enums.all?(Proc) + raise ArgumentError, "enums must contain only procs/lambdas" + end + + if cursor && enums.size != cursor.size + raise ArgumentError, "cursor should have one object per enum" + end + + @enums = enums + @cursors = cursor || Array.new(enums.size) + end + + def each(&block) + return to_enum unless block_given? + + iterate([], 0, &block) + end + + private + + def iterate(current_objects, index, &block) + enumerator = @enums[index].call(*current_objects, @cursors[index]) + + enumerator.each do |object_from_enumerator, cursor_from_enumerator| + if index == @cursors.size - 1 + # we've reached the innermost enumerator, yield for `iterate_with_enumerator` + yield object_from_enumerator, @cursors + else + # we need to go deeper + next_index = index + 1 + iterate(current_objects + [object_from_enumerator], next_index, &block) + # reset cursor at the index of the nested enumerator that just finished, so we don't skip items when that + # index is reused in the next nested iteration + @cursors[next_index] = nil + end + @cursors[index] = cursor_from_enumerator + end + end + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb index e8b1ccdc..828f18be 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -42,6 +42,15 @@ def enqueue_at(job, timestamp) ActiveJob::Base.queue_adapter = :iteration_test class Product < ActiveRecord::Base + has_many :comments +end + +class Comment < ActiveRecord::Base + belongs_to :product +end + +class TravelRoute < ActiveRecord::Base + self.primary_key = [:origin, :destination] end class TravelRoute < ActiveRecord::Base @@ -72,14 +81,21 @@ class TravelRoute < ActiveRecord::Base config.redis = { host: host } end -ActiveRecord::Base.connection.create_table(:products, force: true) do |t| - t.string(:name) - t.timestamps -end +ActiveRecord::Schema.define do + create_table(:products, force: true) do |t| + t.string(:name) + t.timestamps + end + + create_table(:comments, force: true) do |t| + t.string(:content) + t.belongs_to(:product) + end -ActiveRecord::Base.connection.create_table(:travel_routes, force: true, primary_key: [:origin, :destination]) do |t| - t.string(:destination) - t.string(:origin) + create_table(:travel_routes, force: true, primary_key: [:origin, :destination]) do |t| + t.string(:destination) + t.string(:origin) + end end module LoggingHelpers @@ -129,14 +145,22 @@ class IterationUnitTest < ActiveSupport::TestCase private def insert_fixtures - 10.times do |n| - Product.create!(name: "lipstick #{n}") - end + now = Time.now + products = 10.times.map { |n| { name: "lipstick #{n}", created_at: now - n, updated_at: now - n } } + Product.insert_all!(products) + + comments = Product.order(:id).limit(3).map.with_index do |product, index| + comments_count = index + 1 + comments_count.times.map { |n| { content: "#{product.name} comment ##{n}", product_id: product.id } } + end.flatten + + Comment.insert_all!(comments) end def truncate_fixtures ActiveRecord::Base.connection.truncate(TravelRoute.table_name) ActiveRecord::Base.connection.truncate(Product.table_name) + ActiveRecord::Base.connection.truncate(Comment.table_name) end def with_global_default_retry_backoff(backoff) diff --git a/test/unit/enumerator_builder_test.rb b/test/unit/enumerator_builder_test.rb index f6cad033..4126fac9 100644 --- a/test/unit/enumerator_builder_test.rb +++ b/test/unit/enumerator_builder_test.rb @@ -60,6 +60,20 @@ class EnumeratorBuilderTest < ActiveSupport::TestCase enumerator_builder(wraps: 0).build_csv_enumerator(CSV.new("test"), cursor: nil) end + test_builder_method(:build_nested_enumerator) do + enumerator_builder(wraps: 0).build_nested_enumerator( + [ + ->(cursor) { + enumerator_builder.build_active_record_enumerator_on_records(Product.all, cursor: cursor) + }, + ->(product, cursor) { + enumerator_builder.build_active_record_enumerator_on_records(product.comments, cursor: cursor) + }, + ], + cursor: nil, + ) + end + # checks that all the non-alias methods were tested raise "methods not tested: #{methods.inspect}" unless methods.empty? diff --git a/test/unit/iteration_test.rb b/test/unit/iteration_test.rb index 6360e71c..4f89dc3c 100644 --- a/test/unit/iteration_test.rb +++ b/test/unit/iteration_test.rb @@ -178,6 +178,26 @@ def job_should_exit? end end + class JobWithNestedEnumerator < ActiveJob::Base + include JobIteration::Iteration + def build_enumerator(_params, cursor:) + enumerator_builder.nested( + [ + ->(cursor) { + enumerator_builder.build_times_enumerator(3, cursor: cursor) + }, + ->(_integer, cursor) { + enumerator_builder.build_times_enumerator(4, cursor: cursor) + }, + ], + cursor: cursor, + ) + end + + def each_iteration(*) + end + end + def test_jobs_that_define_build_enumerator_and_each_iteration_will_not_raise push(JobWithRightMethods, "walrus" => "best") work_one_job @@ -441,6 +461,30 @@ def test_exception_in_each_iteration_instrumentation assert_equal(expected, events) end + def test_nested_each_iteration_instrumentation + events = [] + callback = lambda { |_, _, _, _, tags| events << tags } + ActiveSupport::Notifications.subscribed(callback, "each_iteration.iteration") do + JobWithNestedEnumerator.perform_now({}) + end + + expected = [ + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [nil, nil] }, + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [nil, 0] }, + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [nil, 1] }, + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [nil, 2] }, + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [0, nil] }, + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [0, 0] }, + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [0, 1] }, + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [0, 2] }, + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [1, nil] }, + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [1, 0] }, + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [1, 1] }, + { job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [1, 2] }, + ] + assert_equal(expected, events) + end + private # Allows building job classes that read max_job_runtime during the test, diff --git a/test/unit/nested_enumerator_test.rb b/test/unit/nested_enumerator_test.rb new file mode 100644 index 00000000..1b4b2186 --- /dev/null +++ b/test/unit/nested_enumerator_test.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +require "test_helper" + +module JobIteration + class NestedEnumeratorTest < IterationUnitTest + test "accepts only callables as enums" do + error = assert_raises(ArgumentError) do + build_enumerator(outer: [[1, 2, 3].each]) + end + assert_equal("enums must contain only procs/lambdas", error.message) + end + + test "raises when cursor is not of the same size as enums" do + error = assert_raises(ArgumentError) do + build_enumerator(cursor: [Product.first.id]) + end + assert_equal("cursor should have one object per enum", error.message) + end + + test "yields enumerator when called without a block" do + enum = build_enumerator + assert enum.is_a?(Enumerator) + assert_nil enum.size + end + + test "yields every nested record with their cursor position" do + enum = build_enumerator + + products = Product.includes(:comments).order(:id).take(3) + comments = products.flat_map { |product| product.comments.sort_by(&:id) } + cursors = [[nil, nil], [1, nil], [1, 2], [2, nil], [2, 4], [2, 5]] + + enum.each_with_index do |(comment, cursor), index| + expected_comment = comments[index] + expected_cursor = cursors[index] + assert_equal(expected_comment, comment) + assert_equal(expected_cursor, cursor) + end + end + + test "cursor can be used to resume" do + enum = build_enumerator + _first_comment, first_cursor = enum.next + second_comment, second_cursor = enum.next + + enum = build_enumerator(cursor: first_cursor) + assert_equal([second_comment, second_cursor], enum.first) + end + + test "doesn't yield anything if contains empty enum" do + enum = ->(cursor, _product) { records_enumerator(Comment.none, cursor: cursor) } + enum = build_enumerator(inner: enum) + assert_empty(enum.to_a) + end + + test "works with single level nesting" do + enum = build_enumerator(inner: nil) + products = Product.order(:id).to_a + cursors = [nil] + (1..9).to_a + + enum.each_with_index do |(product, cursor), index| + assert_equal(products[index], product) + assert_equal([cursors[index]], cursor) + end + end + + private + + def build_enumerator( + outer: ->(cursor) { records_enumerator(Product.all, cursor: cursor) }, + inner: ->(product, cursor) { records_enumerator(product.comments, cursor: cursor) }, + cursor: nil + ) + NestedEnumerator.new([outer, inner].compact, cursor: cursor).each + end + + def records_enumerator(scope, cursor: nil) + ActiveRecordEnumerator.new(scope, cursor: cursor).records + end + end +end