Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
- [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support
- [240](https://github.com/Shopify/job-iteration/pull/240) - Allow setting inheritable per-job `job_iteration_max_job_runtime`
- [289](https://github.com/Shopify/job-iteration/pull/289) - Fix uninitialized constant error when raising `ConditionNotSupportedError` from `ActiveRecordBatchEnumerator`
- [310](https://github.com/Shopify/job-iteration/pull/310) - Support nested iteration
- [338](https://github.com/Shopify/job-iteration/pull/338) - All logs are now `ActiveSupport::Notifications` events and logged using `ActiveSupport::LogSubscriber` to allow customization. Events now always include the `cursor_position` tag.
- [341](https://github.com/Shopify/job-iteration/pull/341) - Add `JobIteration.default_retry_backoff`, which sets a default delay when jobs are re-enqueued after being interrupted. Defaults to `nil`, meaning no delay, which matches the current behaviour.

Expand Down
23 changes: 21 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,27 @@ class CsvJob < ApplicationJob
end
```

```ruby
class NestedIterationJob < ApplicationJob
include JobIteration::Iteration

def build_enumerator(cursor:)
enumerator_builder.nested(
[
->(cursor) { enumerator_builder.active_record_on_records(Shop.all, cursor: cursor) },
->(shop, cursor) { enumerator_builder.active_record_on_records(shop.products, cursor: cursor) },
->(_shop, product, cursor) { enumerator_builder.active_record_on_batch_relations(product.product_variants, cursor: cursor) }
],
cursor: cursor
)
end

def each_iteration(product_variants_relation)
# do something
end
end
```

Iteration hooks into Sidekiq and Resque out of the box to support graceful interruption. No extra configuration is required.

## Guides
Expand Down Expand Up @@ -169,8 +190,6 @@ There a few configuration assumptions that are required for Iteration to work wi

**Why is it important that `each_iteration` takes less than 30 seconds?** When the job worker is scheduled for restart or shutdown, it gets a notice to finish remaining unit of work. To guarantee that no progress is lost we need to make sure that `each_iteration` completes within a reasonable amount of time.

**What do I do if each iteration takes a long time, because it's doing nested operations?** If your `each_iteration` is complex, we recommend enqueuing another job, which will run your nested business logic. We may expose primitives in the future to do this more effectively, but this is not terribly common today.

**Why do I use have to use this ugly helper in `build_enumerator`? Why can't you automatically infer it?** This is how the first version of the API worked. We checked the type of object returned by `build_enumerable`, and whether it was ActiveRecord Relation or an Array, we used the matching adapter. This caused opaque type branching in Iteration internals and it didn’t allow developers to craft their own Enumerators and control the cursor value. We made a decision to _always_ return Enumerator instance from `build_enumerator`. Now we provide explicit helpers to convert ActiveRecord Relation or an Array to Enumerator, and for more complex iteration flows developers can build their own `Enumerator` objects.

**What is the difference between Enumerable and Enumerator?** We recomend [this post](http://blog.arkency.com/2014/01/ruby-to-enum-for-enumerator/) to learn more about Enumerators in Ruby.
Expand Down
36 changes: 36 additions & 0 deletions lib/job-iteration/enumerator_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require_relative "./active_record_enumerator"
require_relative "./csv_enumerator"
require_relative "./throttle_enumerator"
require_relative "./nested_enumerator"
require "forwardable"

module JobIteration
Expand Down Expand Up @@ -146,6 +147,40 @@ def build_csv_enumerator(enumerable, cursor:)
CsvEnumerator.new(enumerable).rows(cursor: cursor)
end

# Builds Enumerator for nested iteration.
#
# @param enums [Array<Proc>] an Array of Procs, each should return an Enumerator.
# Each proc from enums should accept the yielded items from the parent enumerators
# and the `cursor` as its arguments.
# Each proc's `cursor` argument is its part from the `build_enumerator`'s `cursor` array.
# @param cursor [Array<Object>] array of offsets for each of the enums to start iteration from
#
# @example
# def build_enumerator(cursor:)
# enumerator_builder.nested(
# [
# ->(cursor) {
# enumerator_builder.active_record_on_records(Shop.all, cursor: cursor)
# },
# ->(shop, cursor) {
# enumerator_builder.active_record_on_records(shop.products, cursor: cursor)
# },
# ->(_shop, product, cursor) {
# enumerator_builder.active_record_on_batch_relations(product.product_variants, cursor: cursor)
# }
# ],
# cursor: cursor
# )
# end
#
# def each_iteration(product_variants_relation)
# # do something
# end
#
def build_nested_enumerator(enums, cursor:)
NestedEnumerator.new(enums, cursor: cursor).each
end

alias_method :once, :build_once_enumerator
alias_method :times, :build_times_enumerator
alias_method :array, :build_array_enumerator
Expand All @@ -154,6 +189,7 @@ def build_csv_enumerator(enumerable, cursor:)
alias_method :active_record_on_batch_relations, :build_active_record_enumerator_on_batch_relations
alias_method :throttle, :build_throttle_enumerator
alias_method :csv, :build_csv_enumerator
alias_method :nested, :build_nested_enumerator

private

Expand Down
2 changes: 1 addition & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def iterate_with_enumerator(enumerator, arguments)
# Deferred until 2.0.0
# assert_valid_cursor!(cursor_from_enumerator)

tags = instrumentation_tags.merge(cursor_position: cursor_from_enumerator)
tags = instrumentation_tags.merge(cursor_position: cursor_from_enumerator.dup)
ActiveSupport::Notifications.instrument("each_iteration.iteration", tags) do
found_record = true
each_iteration(object_from_enumerator, *arguments)
Expand Down
46 changes: 46 additions & 0 deletions lib/job-iteration/nested_enumerator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# frozen_string_literal: true

module JobIteration
# @private
class NestedEnumerator
def initialize(enums, cursor: nil)
unless enums.all?(Proc)
raise ArgumentError, "enums must contain only procs/lambdas"
end

if cursor && enums.size != cursor.size
raise ArgumentError, "cursor should have one object per enum"
end

@enums = enums
@cursors = cursor || Array.new(enums.size)
end

def each(&block)
return to_enum unless block_given?

iterate([], 0, &block)
end

private

def iterate(current_objects, index, &block)
enumerator = @enums[index].call(*current_objects, @cursors[index])

enumerator.each do |object_from_enumerator, cursor_from_enumerator|
if index == @cursors.size - 1
# we've reached the innermost enumerator, yield for `iterate_with_enumerator`
yield object_from_enumerator, @cursors
else
# we need to go deeper
next_index = index + 1
iterate(current_objects + [object_from_enumerator], next_index, &block)
# reset cursor at the index of the nested enumerator that just finished, so we don't skip items when that
# index is reused in the next nested iteration
@cursors[next_index] = nil
end
@cursors[index] = cursor_from_enumerator
end
end
end
end
44 changes: 34 additions & 10 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ def enqueue_at(job, timestamp)
ActiveJob::Base.queue_adapter = :iteration_test

class Product < ActiveRecord::Base
has_many :comments
end

class Comment < ActiveRecord::Base
belongs_to :product
end

class TravelRoute < ActiveRecord::Base
self.primary_key = [:origin, :destination]
end

class TravelRoute < ActiveRecord::Base
Expand Down Expand Up @@ -72,14 +81,21 @@ class TravelRoute < ActiveRecord::Base
config.redis = { host: host }
end

ActiveRecord::Base.connection.create_table(:products, force: true) do |t|
t.string(:name)
t.timestamps
end
ActiveRecord::Schema.define do
create_table(:products, force: true) do |t|
t.string(:name)
t.timestamps
end

create_table(:comments, force: true) do |t|
t.string(:content)
t.belongs_to(:product)
end

ActiveRecord::Base.connection.create_table(:travel_routes, force: true, primary_key: [:origin, :destination]) do |t|
t.string(:destination)
t.string(:origin)
create_table(:travel_routes, force: true, primary_key: [:origin, :destination]) do |t|
t.string(:destination)
t.string(:origin)
end
end

module LoggingHelpers
Expand Down Expand Up @@ -129,14 +145,22 @@ class IterationUnitTest < ActiveSupport::TestCase
private

def insert_fixtures
10.times do |n|
Product.create!(name: "lipstick #{n}")
end
now = Time.now
products = 10.times.map { |n| { name: "lipstick #{n}", created_at: now - n, updated_at: now - n } }
Product.insert_all!(products)

comments = Product.order(:id).limit(3).map.with_index do |product, index|
comments_count = index + 1
comments_count.times.map { |n| { content: "#{product.name} comment ##{n}", product_id: product.id } }
end.flatten

Comment.insert_all!(comments)
end

def truncate_fixtures
ActiveRecord::Base.connection.truncate(TravelRoute.table_name)
ActiveRecord::Base.connection.truncate(Product.table_name)
ActiveRecord::Base.connection.truncate(Comment.table_name)
end

def with_global_default_retry_backoff(backoff)
Expand Down
14 changes: 14 additions & 0 deletions test/unit/enumerator_builder_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ class EnumeratorBuilderTest < ActiveSupport::TestCase
enumerator_builder(wraps: 0).build_csv_enumerator(CSV.new("test"), cursor: nil)
end

test_builder_method(:build_nested_enumerator) do
enumerator_builder(wraps: 0).build_nested_enumerator(
[
->(cursor) {
enumerator_builder.build_active_record_enumerator_on_records(Product.all, cursor: cursor)
},
->(product, cursor) {
enumerator_builder.build_active_record_enumerator_on_records(product.comments, cursor: cursor)
},
],
cursor: nil,
)
end

# checks that all the non-alias methods were tested
raise "methods not tested: #{methods.inspect}" unless methods.empty?

Expand Down
44 changes: 44 additions & 0 deletions test/unit/iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,26 @@ def job_should_exit?
end
end

class JobWithNestedEnumerator < ActiveJob::Base
include JobIteration::Iteration
def build_enumerator(_params, cursor:)
enumerator_builder.nested(
[
->(cursor) {
enumerator_builder.build_times_enumerator(3, cursor: cursor)
},
->(_integer, cursor) {
enumerator_builder.build_times_enumerator(4, cursor: cursor)
},
],
cursor: cursor,
)
end

def each_iteration(*)
end
end

def test_jobs_that_define_build_enumerator_and_each_iteration_will_not_raise
push(JobWithRightMethods, "walrus" => "best")
work_one_job
Expand Down Expand Up @@ -441,6 +461,30 @@ def test_exception_in_each_iteration_instrumentation
assert_equal(expected, events)
end

def test_nested_each_iteration_instrumentation
events = []
callback = lambda { |_, _, _, _, tags| events << tags }
ActiveSupport::Notifications.subscribed(callback, "each_iteration.iteration") do
JobWithNestedEnumerator.perform_now({})
end

expected = [
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [nil, nil] },
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [nil, 0] },
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [nil, 1] },
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [nil, 2] },
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [0, nil] },
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [0, 0] },
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [0, 1] },
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [0, 2] },
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [1, nil] },
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [1, 0] },
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [1, 1] },
{ job_class: "JobIterationTest::JobWithNestedEnumerator", cursor_position: [1, 2] },
]
assert_equal(expected, events)
end

private

# Allows building job classes that read max_job_runtime during the test,
Expand Down
82 changes: 82 additions & 0 deletions test/unit/nested_enumerator_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# frozen_string_literal: true

require "test_helper"

module JobIteration
class NestedEnumeratorTest < IterationUnitTest
test "accepts only callables as enums" do
error = assert_raises(ArgumentError) do
build_enumerator(outer: [[1, 2, 3].each])
end
assert_equal("enums must contain only procs/lambdas", error.message)
end

test "raises when cursor is not of the same size as enums" do
error = assert_raises(ArgumentError) do
build_enumerator(cursor: [Product.first.id])
end
assert_equal("cursor should have one object per enum", error.message)
end

test "yields enumerator when called without a block" do
enum = build_enumerator
assert enum.is_a?(Enumerator)
assert_nil enum.size
end

test "yields every nested record with their cursor position" do
enum = build_enumerator

products = Product.includes(:comments).order(:id).take(3)
comments = products.flat_map { |product| product.comments.sort_by(&:id) }
cursors = [[nil, nil], [1, nil], [1, 2], [2, nil], [2, 4], [2, 5]]

enum.each_with_index do |(comment, cursor), index|
expected_comment = comments[index]
expected_cursor = cursors[index]
assert_equal(expected_comment, comment)
assert_equal(expected_cursor, cursor)
end
end

test "cursor can be used to resume" do
enum = build_enumerator
_first_comment, first_cursor = enum.next
second_comment, second_cursor = enum.next

enum = build_enumerator(cursor: first_cursor)
assert_equal([second_comment, second_cursor], enum.first)
end

test "doesn't yield anything if contains empty enum" do
enum = ->(cursor, _product) { records_enumerator(Comment.none, cursor: cursor) }
enum = build_enumerator(inner: enum)
assert_empty(enum.to_a)
end

test "works with single level nesting" do
enum = build_enumerator(inner: nil)
products = Product.order(:id).to_a
cursors = [nil] + (1..9).to_a

enum.each_with_index do |(product, cursor), index|
assert_equal(products[index], product)
assert_equal([cursors[index]], cursor)
end
end

private

def build_enumerator(
outer: ->(cursor) { records_enumerator(Product.all, cursor: cursor) },
inner: ->(product, cursor) { records_enumerator(product.comments, cursor: cursor) },
cursor: nil
)
NestedEnumerator.new([outer, inner].compact, cursor: cursor).each
end

def records_enumerator(scope, cursor: nil)
ActiveRecordEnumerator.new(scope, cursor: cursor).records
end
end
end