From be37c7e4a23786851ab039519fb71eff7372ce80 Mon Sep 17 00:00:00 2001 From: Santiago Calvo Date: Tue, 21 Apr 2026 15:28:48 +0100 Subject: [PATCH] feat: add extra columns --- README.md | 81 +++- lib/staging_table.rb | 10 + lib/staging_table/adapters/base.rb | 79 +++- lib/staging_table/adapters/mysql.rb | 46 +++ lib/staging_table/adapters/postgresql.rb | 50 +++ lib/staging_table/adapters/sqlite.rb | 36 ++ lib/staging_table/bulk_inserter.rb | 6 +- lib/staging_table/conflict_resolver.rb | 169 +++++++++ lib/staging_table/session.rb | 4 +- .../transfer_strategies/insert.rb | 4 +- .../transfer_strategies/upsert.rb | 3 +- spec/staging_table/conflict_resolver_spec.rb | 350 ++++++++++++++++++ spec/staging_table/errors_spec.rb | 4 +- spec/staging_table/extra_columns_spec.rb | 209 +++++++++++ .../insert_on_conflict_integration_spec.rb | 73 ++++ 15 files changed, 1115 insertions(+), 9 deletions(-) create mode 100644 lib/staging_table/conflict_resolver.rb create mode 100644 spec/staging_table/conflict_resolver_spec.rb create mode 100644 spec/staging_table/extra_columns_spec.rb create mode 100644 spec/staging_table/insert_on_conflict_integration_spec.rb diff --git a/README.md b/README.md index d70fbc5..c72d2c4 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,7 @@ end Don't let duplicates crash your party. Configure the transfer strategy to handle conflicts gracefully. ```ruby -StagingTable.stage(User, +StagingTable.stage(User, transfer_strategy: :upsert, # Default is :insert conflict_target: [:email], # Column(s) to check for conflicts conflict_action: :update # :update (overwrite) or :ignore (skip) @@ -112,6 +112,85 @@ StagingTable.stage(User, end ``` +### 📦 Extra Columns + +Need columns in your staging table that don't exist in the source model? Perfect for tracking import metadata, priorities, or processing flags. + +```ruby +StagingTable.stage(User, + extra_columns: { + priority: :integer, # Simple type + processed: {type: :boolean, default: false}, # With options + import_batch: {type: :string, default: "batch_1"} + } +) do |staging| + # Insert with extra column values + staging.insert([ + {name: "John", email: "john@example.com", priority: 1}, + {name: "Jane", email: "jane@example.com", priority: 2} + ]) + + # Query using extra columns + staging.where(priority: 1).find_each do |record| + # Process high priority records first + end + + # Mark as processed + staging.where(processed: false).update_all(processed: true) + + # Extra columns are automatically excluded during transfer +end +``` + +**Supported column types:** `:string`, `:text`, `:integer`, `:bigint`, `:float`, `:decimal`, `:boolean`, `:datetime`, `:date`, `:time`, `:binary`, `:json`, `:jsonb` (PostgreSQL), `:uuid` (PostgreSQL) + +### 🔀 Staging Insert Conflict Resolution + +Handle conflicts when inserting into the staging table itself. Useful when you're aggregating data from multiple sources or processing the same records multiple times. + +```ruby +StagingTable.stage(User, + extra_columns: {priority: :integer, score: :integer}, + insert_on_conflict: { + target: [:email], # Column(s) to detect conflicts + update: { + priority: :greatest, # Keep the higher value + score: :sum, # Add values together + name: :new, # Use the incoming value + age: :existing # Keep the existing value (skip update) + } + } +) do |staging| + # First batch + staging.insert([ + {email: "john@example.com", name: "John", priority: 1, score: 10} + ]) + + # Second batch - conflicts are resolved automatically + staging.insert([ + {email: "john@example.com", name: "Johnny", priority: 5, score: 20} + ]) + + # Result: priority=5 (greatest), score=30 (sum), name="Johnny" (new) +end +``` + +**Available resolution strategies:** + +| Strategy | Description | Example Result | +|----------|-------------|----------------| +| `:greatest` | Keep the larger value | `GREATEST(existing, incoming)` | +| `:least` | Keep the smaller value | `LEAST(existing, incoming)` | +| `:new` | Use the incoming value | Overwrites existing | +| `:existing` | Keep the existing value | Skips update for this column | +| `:sum` | Add values together | `existing + incoming` | +| `:coalesce` | Use incoming if not null, else existing | `COALESCE(incoming, existing)` | +| `"raw SQL"` | Custom SQL expression | `"COALESCE(excluded.col, staging.col)"` | + +**Note:** For PostgreSQL and SQLite, `target` columns are used explicitly in the `ON CONFLICT` clause and must be covered by a unique or exclusion constraint. + +For MySQL, `ON DUPLICATE KEY UPDATE` is triggered by *any* violated unique (or primary) key on the table — not just the ones listed in `:target`. If your staging table has multiple unique indexes, the update may fire on conflicts you did not intend. Make sure the column(s) you list in `:target` are the only unique constraint that matters for your use case, or remove any other unique indexes before inserting. Raw SQL strategies using `VALUES(col)` are deprecated since MySQL 8.0.20; the clause still works but may emit warnings. + ### 📊 Transfer Results Every transfer returns a `TransferResult` with detailed statistics: diff --git a/lib/staging_table.rb b/lib/staging_table.rb index d820640..dde0921 100644 --- a/lib/staging_table.rb +++ b/lib/staging_table.rb @@ -8,6 +8,7 @@ require "staging_table/transfer_result" require "staging_table/session" require "staging_table/model_factory" +require "staging_table/conflict_resolver" require "staging_table/bulk_inserter" require "staging_table/adapters/base" require "staging_table/adapters/postgresql" @@ -34,6 +35,15 @@ def configure # @option options [Symbol] :transfer_strategy :insert or :upsert (default: :insert) # @option options [Array] :conflict_target Columns for upsert conflict detection # @option options [Symbol] :conflict_action :update or :ignore for upsert conflicts + # @option options [Hash] :extra_columns Additional columns to add to staging table only. + # Keys are column names, values are either a symbol type (:integer, :string, :boolean, etc.) + # or a hash with :type, :default, and :null options. + # Example: { priority: :integer, processed: { type: :boolean, default: false } } + # @option options [Hash] :insert_on_conflict Conflict resolution for staging table inserts. + # :target - Column(s) to detect conflicts on (Symbol or Array of Symbols) + # :update - Hash of column => strategy pairs. Strategies: :greatest, :least, :new, + # :existing, :sum, :coalesce, or a raw SQL string. + # Example: { target: [:external_id], update: { priority: :greatest, score: :least } } # @option options [Proc] :before_insert Called before inserting into staging # @option options [Proc] :after_insert Called after inserting into staging # @option options [Proc] :before_transfer Called before transferring to target diff --git a/lib/staging_table/adapters/base.rb b/lib/staging_table/adapters/base.rb index bf7e505..073d555 100644 --- a/lib/staging_table/adapters/base.rb +++ b/lib/staging_table/adapters/base.rb @@ -23,7 +23,7 @@ def self.for(connection) case adapter_name when /postgresql/ Postgresql.new(connection) - when /mysql/ + when /mysql|trilogy/ Mysql.new(connection) when /sqlite/ Sqlite.new(connection) @@ -31,6 +31,83 @@ def self.for(connection) raise AdapterError, "Unsupported adapter: #{adapter_name}. StagingTable supports PostgreSQL, MySQL, and SQLite adapters." end end + + protected + + def add_extra_columns(table_name, extra_columns) + return if extra_columns.nil? || extra_columns.empty? + + extra_columns.each do |column_name, column_spec| + add_column(table_name, column_name, column_spec) + end + end + + def add_column(table_name, column_name, column_spec) + quoted_table = connection.quote_table_name(table_name) + quoted_column = connection.quote_column_name(column_name) + column_definition = parse_column_spec(column_spec) + + sql = "ALTER TABLE #{quoted_table} ADD COLUMN #{quoted_column} #{column_definition}" + connection.execute(sql) + end + + def parse_column_spec(spec) + if spec.is_a?(Symbol) + sql_type_for(spec) + elsif spec.is_a?(Hash) + type = sql_type_for(spec[:type]) + parts = [type] + parts << "DEFAULT #{quote_default(spec[:default])}" if spec.key?(:default) + parts << "NOT NULL" if spec[:null] == false + parts.join(" ") + else + raise ConfigurationError, "Invalid column spec: #{spec.inspect}. Must be a symbol or hash with :type key." + end + end + + def sql_type_for(type) + case type + when :string, :text + "TEXT" + when :integer + "INTEGER" + when :float + "REAL" + when :decimal + "DECIMAL" + when :boolean + "BOOLEAN" + when :datetime, :timestamp + "TIMESTAMP" + when :date + "DATE" + when :time + "TIME" + when :binary + "BLOB" + when :json, :jsonb + "TEXT" + else + raise ConfigurationError, "Unsupported column type: #{type.inspect}" + end + end + + def quote_default(value) + case value + when nil + "NULL" + when true + "TRUE" + when false + "FALSE" + when Numeric + value.to_s + when String + connection.quote(value) + else + connection.quote(value.to_s) + end + end end end end diff --git a/lib/staging_table/adapters/mysql.rb b/lib/staging_table/adapters/mysql.rb index b7a68eb..f5edaaf 100644 --- a/lib/staging_table/adapters/mysql.rb +++ b/lib/staging_table/adapters/mysql.rb @@ -8,6 +8,52 @@ def create_table(temp_table_name, source_table_name, options = {}) quoted_temp = connection.quote_table_name(temp_table_name) quoted_source = connection.quote_table_name(source_table_name) connection.execute("CREATE TABLE #{quoted_temp} LIKE #{quoted_source}") + + add_extra_columns(temp_table_name, options[:extra_columns]) + end + + protected + + def sql_type_for(type) + case type + when :string + "VARCHAR(255)" + when :text + "TEXT" + when :integer + "INT" + when :bigint + "BIGINT" + when :float + "DOUBLE" + when :decimal + "DECIMAL" + when :boolean + "TINYINT(1)" + when :datetime, :timestamp + "DATETIME" + when :date + "DATE" + when :time + "TIME" + when :binary + "BLOB" + when :json + "JSON" + else + raise ConfigurationError, "Unsupported column type for MySQL: #{type.inspect}" + end + end + + def quote_default(value) + case value + when true + "1" + when false + "0" + else + super + end end end end diff --git a/lib/staging_table/adapters/postgresql.rb b/lib/staging_table/adapters/postgresql.rb index e6a47eb..8e5316c 100644 --- a/lib/staging_table/adapters/postgresql.rb +++ b/lib/staging_table/adapters/postgresql.rb @@ -10,6 +10,56 @@ def create_table(temp_table_name, source_table_name, options = {}) sql += " INCLUDING INDEXES" if options[:include_indexes] sql += ")" connection.execute(sql) + + add_extra_columns(temp_table_name, options[:extra_columns]) + end + + protected + + def sql_type_for(type) + case type + when :string + "VARCHAR" + when :text + "TEXT" + when :integer + "INTEGER" + when :bigint + "BIGINT" + when :float + "DOUBLE PRECISION" + when :decimal + "DECIMAL" + when :boolean + "BOOLEAN" + when :datetime, :timestamp + "TIMESTAMP" + when :date + "DATE" + when :time + "TIME" + when :binary + "BYTEA" + when :json + "JSON" + when :jsonb + "JSONB" + when :uuid + "UUID" + else + raise ConfigurationError, "Unsupported column type for PostgreSQL: #{type.inspect}" + end + end + + def quote_default(value) + case value + when true + "true" + when false + "false" + else + super + end end end end diff --git a/lib/staging_table/adapters/sqlite.rb b/lib/staging_table/adapters/sqlite.rb index 7ba68bc..908f06e 100644 --- a/lib/staging_table/adapters/sqlite.rb +++ b/lib/staging_table/adapters/sqlite.rb @@ -26,6 +26,42 @@ def create_table(temp_table_name, source_table_name, options = {}) if options[:include_indexes] copy_indexes(temp_table_name, source_table_name) end + + add_extra_columns(temp_table_name, options[:extra_columns]) + end + + protected + + def sql_type_for(type) + case type + when :string, :text + "TEXT" + when :integer, :bigint + "INTEGER" + when :float, :decimal + "REAL" + when :boolean + "INTEGER" + when :datetime, :timestamp, :date, :time + "TEXT" + when :binary + "BLOB" + when :json, :jsonb + "TEXT" + else + raise ConfigurationError, "Unsupported column type for SQLite: #{type.inspect}" + end + end + + def quote_default(value) + case value + when true + "1" + when false + "0" + else + super + end end private diff --git a/lib/staging_table/bulk_inserter.rb b/lib/staging_table/bulk_inserter.rb index 21e5e98..7be42f6 100644 --- a/lib/staging_table/bulk_inserter.rb +++ b/lib/staging_table/bulk_inserter.rb @@ -4,11 +4,12 @@ module StagingTable class BulkInserter - attr_reader :model, :batch_size + attr_reader :model, :batch_size, :conflict_resolver - def initialize(model, batch_size: 1000) + def initialize(model, batch_size: 1000, insert_on_conflict: nil) @model = model @batch_size = batch_size + @conflict_resolver = ConflictResolver.new(connection, insert_on_conflict) end def insert(records) @@ -30,6 +31,7 @@ def insert(records) end.join(", ") sql = "INSERT INTO #{quoted_table} (#{quoted_columns}) VALUES #{values_list}" + sql += conflict_resolver.conflict_clause(model.table_name) connection.execute(sql) end end diff --git a/lib/staging_table/conflict_resolver.rb b/lib/staging_table/conflict_resolver.rb new file mode 100644 index 0000000..f447dda --- /dev/null +++ b/lib/staging_table/conflict_resolver.rb @@ -0,0 +1,169 @@ +# frozen_string_literal: true + +module StagingTable + class ConflictResolver + VALID_STRATEGIES = %i[greatest least new existing sum coalesce].freeze + + attr_reader :connection, :options + + def initialize(connection, options) + @connection = connection + @options = options || {} + validate_options! if enabled? + end + + def enabled? + options.is_a?(Hash) && options[:target].present? && options[:update].present? + end + + def conflict_clause(table_name) + return "" unless enabled? + + target_columns = Array(options[:target]) + update_rules = options[:update] + + quoted_targets = target_columns.map { |c| connection.quote_column_name(c) } + set_clauses = build_set_clauses(table_name, update_rules) + + case adapter + when :postgresql, :sqlite + on_conflict_clause(quoted_targets, set_clauses) + when :mysql + mysql_conflict_clause(quoted_targets, set_clauses) + else + raise AdapterError, "Conflict resolution is not supported for adapter: #{connection.adapter_name}" + end + end + + private + + def adapter + name = connection.adapter_name.downcase + case name + when /postgresql/ then :postgresql + when /mysql|trilogy/ then :mysql + when /sqlite/ then :sqlite + end + end + + def validate_options! + unless options[:target].is_a?(Array) || options[:target].is_a?(Symbol) + raise ConfigurationError, "insert_on_conflict :target must be a symbol or array of symbols" + end + + unless options[:update].is_a?(Hash) + raise ConfigurationError, "insert_on_conflict :update must be a hash" + end + + options[:update].each do |_column, strategy| + next if strategy.is_a?(String) # Raw SQL is allowed + unless VALID_STRATEGIES.include?(strategy) + raise ConfigurationError, "Invalid conflict resolution strategy: #{strategy}. Valid strategies are: #{VALID_STRATEGIES.join(", ")}, or a raw SQL string." + end + end + end + + def build_set_clauses(table_name, update_rules) + quoted_table = connection.quote_table_name(table_name) + + update_rules.filter_map do |column, strategy| + next if strategy == :existing # :existing means keep current value (omit from SET) + + quoted_col = connection.quote_column_name(column) + value = resolve_value(quoted_table, quoted_col, strategy) + "#{quoted_col} = #{value}" + end + end + + def resolve_value(table_name, quoted_col, strategy) + case strategy + when :greatest + greatest_expression(table_name, quoted_col) + when :least + least_expression(table_name, quoted_col) + when :new + new_value_expression(quoted_col) + when :sum + sum_expression(table_name, quoted_col) + when :coalesce + coalesce_expression(table_name, quoted_col) + when String + strategy # Raw SQL passed through + else + raise ConfigurationError, "Unknown conflict resolution strategy: #{strategy}" + end + end + + def greatest_expression(table_name, quoted_col) + case adapter + when :postgresql + "GREATEST(#{table_name}.#{quoted_col}, EXCLUDED.#{quoted_col})" + when :mysql + "GREATEST(#{table_name}.#{quoted_col}, VALUES(#{quoted_col}))" + when :sqlite + "MAX(#{table_name}.#{quoted_col}, excluded.#{quoted_col})" + end + end + + def least_expression(table_name, quoted_col) + case adapter + when :postgresql + "LEAST(#{table_name}.#{quoted_col}, EXCLUDED.#{quoted_col})" + when :mysql + "LEAST(#{table_name}.#{quoted_col}, VALUES(#{quoted_col}))" + when :sqlite + "MIN(#{table_name}.#{quoted_col}, excluded.#{quoted_col})" + end + end + + def new_value_expression(quoted_col) + case adapter + when :postgresql + "EXCLUDED.#{quoted_col}" + when :mysql + "VALUES(#{quoted_col})" + when :sqlite + "excluded.#{quoted_col}" + end + end + + def sum_expression(table_name, quoted_col) + case adapter + when :postgresql + "#{table_name}.#{quoted_col} + EXCLUDED.#{quoted_col}" + when :mysql + "#{table_name}.#{quoted_col} + VALUES(#{quoted_col})" + when :sqlite + "#{table_name}.#{quoted_col} + excluded.#{quoted_col}" + end + end + + def coalesce_expression(table_name, quoted_col) + case adapter + when :postgresql + "COALESCE(EXCLUDED.#{quoted_col}, #{table_name}.#{quoted_col})" + when :mysql + "COALESCE(VALUES(#{quoted_col}), #{table_name}.#{quoted_col})" + when :sqlite + "COALESCE(excluded.#{quoted_col}, #{table_name}.#{quoted_col})" + end + end + + def on_conflict_clause(quoted_targets, set_clauses) + return " ON CONFLICT (#{quoted_targets.join(", ")}) DO NOTHING" if set_clauses.empty? + + " ON CONFLICT (#{quoted_targets.join(", ")}) DO UPDATE SET #{set_clauses.join(", ")}" + end + + def mysql_conflict_clause(quoted_targets, set_clauses) + # MySQL has no DO NOTHING; emit a no-op assignment on a target column so + # the duplicate row is silently ignored instead of raising. + if set_clauses.empty? + noop_col = quoted_targets.first + return " ON DUPLICATE KEY UPDATE #{noop_col} = #{noop_col}" + end + + " ON DUPLICATE KEY UPDATE #{set_clauses.join(", ")}" + end + end +end diff --git a/lib/staging_table/session.rb b/lib/staging_table/session.rb index cd2ad71..e180ae1 100644 --- a/lib/staging_table/session.rb +++ b/lib/staging_table/session.rb @@ -72,7 +72,7 @@ def insert(records) } Instrumentation.instrument(:insert, payload) do - BulkInserter.new(staging_model, batch_size: options[:batch_size] || 1000).insert(normalized_records) + BulkInserter.new(staging_model, batch_size: options[:batch_size] || 1000, insert_on_conflict: options[:insert_on_conflict]).insert(normalized_records) end run_callback(:after_insert, self, normalized_records) @@ -98,7 +98,7 @@ def insert_from_query(relation) relation.find_in_batches(batch_size: options[:batch_size] || 1000) do |batch| records = batch.map(&:attributes) all_records.concat(records) - BulkInserter.new(staging_model, batch_size: options[:batch_size] || 1000).insert(records) + BulkInserter.new(staging_model, batch_size: options[:batch_size] || 1000, insert_on_conflict: options[:insert_on_conflict]).insert(records) end instrumentation_payload[:record_count] = all_records.size end diff --git a/lib/staging_table/transfer_strategies/insert.rb b/lib/staging_table/transfer_strategies/insert.rb index a208e68..5a50a86 100644 --- a/lib/staging_table/transfer_strategies/insert.rb +++ b/lib/staging_table/transfer_strategies/insert.rb @@ -14,7 +14,9 @@ def transfer staged_count = @staging_model.count return TransferResult.new if staged_count.zero? - columns = @staging_model.column_names.map { |c| @connection.quote_column_name(c) }.join(", ") + # Only transfer columns that exist on both staging and source tables + transferable_columns = @staging_model.column_names & @source_model.column_names + columns = transferable_columns.map { |c| @connection.quote_column_name(c) }.join(", ") source_table = @connection.quote_table_name(@source_model.table_name) staging_table = @connection.quote_table_name(@staging_model.table_name) diff --git a/lib/staging_table/transfer_strategies/upsert.rb b/lib/staging_table/transfer_strategies/upsert.rb index 4ef6cd1..7901708 100644 --- a/lib/staging_table/transfer_strategies/upsert.rb +++ b/lib/staging_table/transfer_strategies/upsert.rb @@ -144,7 +144,8 @@ def quote(value) end def column_names - @staging_model.column_names + # Only transfer columns that exist on both staging and source tables + @staging_model.column_names & @source_model.column_names end def quote_column(name) diff --git a/spec/staging_table/conflict_resolver_spec.rb b/spec/staging_table/conflict_resolver_spec.rb new file mode 100644 index 0000000..b751a0d --- /dev/null +++ b/spec/staging_table/conflict_resolver_spec.rb @@ -0,0 +1,350 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe StagingTable::ConflictResolver do + let(:connection) { ActiveRecord::Base.connection } + let(:table_name) { "staging_test" } + let(:columns) { %w[id name email priority score] } + + describe "#enabled?" do + it "returns false when options is nil" do + resolver = described_class.new(connection, nil) + expect(resolver.enabled?).to be false + end + + it "returns false when options is empty" do + resolver = described_class.new(connection, {}) + expect(resolver.enabled?).to be false + end + + it "returns false when target is missing" do + resolver = described_class.new(connection, {update: {priority: :greatest}}) + expect(resolver.enabled?).to be false + end + + it "returns false when update is missing" do + resolver = described_class.new(connection, {target: [:email]}) + expect(resolver.enabled?).to be false + end + + it "returns true when both target and update are present" do + resolver = described_class.new(connection, {target: [:email], update: {priority: :greatest}}) + expect(resolver.enabled?).to be true + end + end + + describe "validation" do + it "raises error for invalid target type" do + expect { + described_class.new(connection, {target: "email", update: {priority: :greatest}}) + }.to raise_error(StagingTable::ConfigurationError, /target must be a symbol or array/) + end + + it "raises error for invalid update type" do + expect { + described_class.new(connection, {target: [:email], update: "priority"}) + }.to raise_error(StagingTable::ConfigurationError, /update must be a hash/) + end + + it "raises error for invalid strategy" do + expect { + described_class.new(connection, {target: [:email], update: {priority: :invalid}}) + }.to raise_error(StagingTable::ConfigurationError, /Invalid conflict resolution strategy: invalid/) + end + + it "accepts valid symbol strategies" do + valid_strategies = %i[greatest least new existing sum coalesce] + valid_strategies.each do |strategy| + expect { + described_class.new(connection, {target: [:email], update: {priority: strategy}}) + }.not_to raise_error + end + end + + it "accepts raw SQL strings as strategies" do + expect { + described_class.new(connection, {target: [:email], update: {priority: "CUSTOM_FUNC(priority)"}}) + }.not_to raise_error + end + end + + shared_examples "conflict clause generation" do + let(:adapter) { StagingTable::Adapters::Base.for(connection) } + + describe "#conflict_clause" do + it "returns empty string when not enabled" do + resolver = described_class.new(connection, nil) + expect(resolver.conflict_clause(table_name)).to eq("") + end + + it "generates clause with single target column" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :new} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("email") + end + + it "generates clause with multiple target columns" do + resolver = described_class.new(connection, { + target: [:email, :name], + update: {priority: :new} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("email") + expect(clause).to include("name") + end + + it "omits columns with :existing strategy from SET clause" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :existing, score: :new} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).not_to include("priority") + expect(clause).to include("score") + end + + it "handles :greatest strategy" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :greatest} + }) + clause = resolver.conflict_clause(table_name) + expect(clause.downcase).to match(/greatest|max/) + expect(clause).to include("priority") + end + + it "handles :least strategy" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :least} + }) + clause = resolver.conflict_clause(table_name) + expect(clause.downcase).to match(/least|min/) + expect(clause).to include("priority") + end + + it "handles :new strategy" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :new} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("priority") + end + + it "handles :sum strategy" do + resolver = described_class.new(connection, { + target: [:email], + update: {score: :sum} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("+") + expect(clause).to include("score") + end + + it "handles :coalesce strategy" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :coalesce} + }) + clause = resolver.conflict_clause(table_name) + expect(clause.downcase).to include("coalesce") + expect(clause).to include("priority") + end + + it "passes through raw SQL strings" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: "CUSTOM_FUNC(priority, 10)"} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("CUSTOM_FUNC(priority, 10)") + end + + it "handles multiple update columns" do + resolver = described_class.new(connection, { + target: [:email], + update: { + priority: :greatest, + score: :least, + name: :new + } + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("priority") + expect(clause).to include("score") + expect(clause).to include("name") + end + end + end + + context "with PostgreSQL", :postgresql do + include_examples "conflict clause generation" + + it "generates PostgreSQL-specific ON CONFLICT syntax" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :new} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("ON CONFLICT") + expect(clause).to include("DO UPDATE SET") + expect(clause).to include("EXCLUDED") + end + + it "generates DO NOTHING when all columns use :existing" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :existing} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("DO NOTHING") + end + end + + context "with MySQL", :mysql do + # MySQL uses ON DUPLICATE KEY which doesn't explicitly name target columns + # so we skip the shared examples that check for target column presence + let(:adapter) { StagingTable::Adapters::Base.for(connection) } + + describe "#conflict_clause" do + it "returns empty string when not enabled" do + resolver = described_class.new(connection, nil) + expect(resolver.conflict_clause(table_name)).to eq("") + end + + it "omits columns with :existing strategy from SET clause" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :existing, score: :new} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).not_to include("priority") + expect(clause).to include("score") + end + + it "handles :greatest strategy" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :greatest} + }) + clause = resolver.conflict_clause(table_name) + expect(clause.downcase).to include("greatest") + expect(clause).to include("priority") + end + + it "handles :least strategy" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :least} + }) + clause = resolver.conflict_clause(table_name) + expect(clause.downcase).to include("least") + expect(clause).to include("priority") + end + + it "handles :new strategy" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :new} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("priority") + end + + it "handles :sum strategy" do + resolver = described_class.new(connection, { + target: [:email], + update: {score: :sum} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("+") + expect(clause).to include("score") + end + + it "handles :coalesce strategy" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :coalesce} + }) + clause = resolver.conflict_clause(table_name) + expect(clause.downcase).to include("coalesce") + expect(clause).to include("priority") + end + + it "passes through raw SQL strings" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: "CUSTOM_FUNC(priority, 10)"} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("CUSTOM_FUNC(priority, 10)") + end + + it "handles multiple update columns" do + resolver = described_class.new(connection, { + target: [:email], + update: { + priority: :greatest, + score: :least, + name: :new + } + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("priority") + expect(clause).to include("score") + expect(clause).to include("name") + end + end + + it "generates MySQL-specific ON DUPLICATE KEY syntax" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :new} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("ON DUPLICATE KEY UPDATE") + expect(clause).to include("VALUES") + end + + it "emits a no-op assignment when all columns use :existing" do + # MySQL has no DO NOTHING, so we assign the target column to itself to + # silently ignore the duplicate row instead of raising RecordNotUnique. + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :existing} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("ON DUPLICATE KEY UPDATE") + expect(clause).to match(/`email`\s*=\s*`email`/) + end + end + + context "with SQLite", :sqlite do + include_examples "conflict clause generation" + + it "generates SQLite-specific ON CONFLICT syntax" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :new} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("ON CONFLICT") + expect(clause).to include("DO UPDATE SET") + expect(clause.downcase).to include("excluded") + end + + it "generates DO NOTHING when all columns use :existing" do + resolver = described_class.new(connection, { + target: [:email], + update: {priority: :existing} + }) + clause = resolver.conflict_clause(table_name) + expect(clause).to include("DO NOTHING") + end + end +end diff --git a/spec/staging_table/errors_spec.rb b/spec/staging_table/errors_spec.rb index 9e54996..748e696 100644 --- a/spec/staging_table/errors_spec.rb +++ b/spec/staging_table/errors_spec.rb @@ -85,7 +85,9 @@ describe StagingTable::RecordError do it "is raised for invalid record format" do - inserter = StagingTable::BulkInserter.new(double("Model")) + connection = double("Connection") + model_double = double("Model", connection: connection) + inserter = StagingTable::BulkInserter.new(model_double) expect { inserter.insert(["not a hash"]) diff --git a/spec/staging_table/extra_columns_spec.rb b/spec/staging_table/extra_columns_spec.rb new file mode 100644 index 0000000..7bd30f3 --- /dev/null +++ b/spec/staging_table/extra_columns_spec.rb @@ -0,0 +1,209 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe "Extra Columns" do + shared_examples "extra columns support" do + let(:adapter) { StagingTable::Adapters::Base.for(ActiveRecord::Base.connection) } + let(:temp_table_name) { "staging_test_#{SecureRandom.hex(8)}" } + + after do + adapter.drop_table(temp_table_name) + end + + describe "adding columns with simple type" do + it "adds an integer column" do + adapter.create_table(temp_table_name, "test_users", extra_columns: {priority: :integer}) + + staging_model = StagingTable::ModelFactory.build(TestUser, temp_table_name) + staging_model.reset_column_information + + expect(staging_model.column_names).to include("priority") + end + + it "adds a string column" do + adapter.create_table(temp_table_name, "test_users", extra_columns: {status: :string}) + + staging_model = StagingTable::ModelFactory.build(TestUser, temp_table_name) + staging_model.reset_column_information + + expect(staging_model.column_names).to include("status") + end + + it "adds a boolean column" do + adapter.create_table(temp_table_name, "test_users", extra_columns: {processed: :boolean}) + + staging_model = StagingTable::ModelFactory.build(TestUser, temp_table_name) + staging_model.reset_column_information + + expect(staging_model.column_names).to include("processed") + end + + it "adds multiple columns" do + adapter.create_table(temp_table_name, "test_users", extra_columns: { + priority: :integer, + status: :string, + processed: :boolean + }) + + staging_model = StagingTable::ModelFactory.build(TestUser, temp_table_name) + staging_model.reset_column_information + + expect(staging_model.column_names).to include("priority", "status", "processed") + end + end + + describe "adding columns with options" do + it "adds a column with a default value" do + adapter.create_table(temp_table_name, "test_users", extra_columns: { + processed: {type: :boolean, default: false} + }) + + staging_model = StagingTable::ModelFactory.build(TestUser, temp_table_name) + staging_model.reset_column_information + + expect(staging_model.column_names).to include("processed") + + # Insert a record without specifying the extra column + inserter = StagingTable::BulkInserter.new(staging_model) + inserter.insert([{name: "John", email: "john@example.com"}]) + + record = staging_model.first + expect([false, 0]).to include(record.processed) # SQLite stores as 0 + end + + it "adds an integer column with a default" do + adapter.create_table(temp_table_name, "test_users", extra_columns: { + priority: {type: :integer, default: 0} + }) + + staging_model = StagingTable::ModelFactory.build(TestUser, temp_table_name) + staging_model.reset_column_information + + inserter = StagingTable::BulkInserter.new(staging_model) + inserter.insert([{name: "John", email: "john@example.com"}]) + + record = staging_model.first + expect(record.priority).to eq(0) + end + end + + describe "querying extra columns" do + it "can query on extra columns" do + adapter.create_table(temp_table_name, "test_users", extra_columns: { + priority: {type: :integer, default: 0} + }) + + staging_model = StagingTable::ModelFactory.build(TestUser, temp_table_name) + staging_model.reset_column_information + + inserter = StagingTable::BulkInserter.new(staging_model) + inserter.insert([ + {name: "John", email: "john@example.com", priority: 1}, + {name: "Jane", email: "jane@example.com", priority: 2}, + {name: "Bob", email: "bob@example.com", priority: 1} + ]) + + high_priority = staging_model.where(priority: 2) + expect(high_priority.count).to eq(1) + expect(high_priority.first.name).to eq("Jane") + end + + it "can update extra columns" do + adapter.create_table(temp_table_name, "test_users", extra_columns: { + processed: {type: :boolean, default: false} + }) + + staging_model = StagingTable::ModelFactory.build(TestUser, temp_table_name) + staging_model.reset_column_information + + inserter = StagingTable::BulkInserter.new(staging_model) + inserter.insert([ + {name: "John", email: "john@example.com"}, + {name: "Jane", email: "jane@example.com"} + ]) + + # Update one record + staging_model.where(name: "John").update_all(processed: true) + + john = staging_model.find_by(name: "John") + jane = staging_model.find_by(name: "Jane") + + expect([true, 1]).to include(john.processed) # SQLite stores as 1 + expect([false, 0]).to include(jane.processed) # SQLite stores as 0 + end + end + + describe "integration with StagingTable.stage" do + before do + TestUser.delete_all + end + + it "creates staging table with extra columns" do + result = StagingTable.stage(TestUser, extra_columns: {priority: :integer}) do |staging| + staging.insert([ + {name: "John", email: "john@example.com", priority: 5} + ]) + + record = staging.first + expect(record.priority).to eq(5) + end + + expect(result).to be_a(StagingTable::TransferResult) + end + + it "extra columns do not affect transfer to target" do + result = StagingTable.stage(TestUser, extra_columns: {priority: :integer}) do |staging| + staging.insert([ + {name: "Jane", email: "jane@example.com", priority: 5} + ]) + end + + expect(result.inserted).to eq(1) + user = TestUser.find_by(name: "Jane") + expect(user).not_to be_nil + expect(user.email).to eq("jane@example.com") + # priority column doesn't exist on TestUser, should not cause error + end + + it "omits extra columns from the transfer SQL" do + executed_sql = [] + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |*, payload| + executed_sql << payload[:sql] if payload[:sql].is_a?(String) + end + + begin + StagingTable.stage(TestUser, extra_columns: {priority: :integer}) do |staging| + staging.insert([{name: "Zed", email: "zed@example.com", priority: 9}]) + end + ensure + ActiveSupport::Notifications.unsubscribe(subscriber) + end + + transfer_sql = executed_sql.find { |s| s.match?(/\AINSERT INTO .*test_users.*SELECT/m) } + expect(transfer_sql).not_to be_nil + expect(transfer_sql).not_to match(/\bpriority\b/i) + end + end + + describe "error handling" do + it "raises ConfigurationError for unknown column types" do + expect { + adapter.create_table(temp_table_name, "test_users", extra_columns: {mystery: :wizard}) + }.to raise_error(StagingTable::ConfigurationError, /Unsupported column type/) + end + end + end + + context "with PostgreSQL", :postgresql do + include_examples "extra columns support" + end + + context "with MySQL", :mysql do + include_examples "extra columns support" + end + + context "with SQLite", :sqlite do + include_examples "extra columns support" + end +end diff --git a/spec/staging_table/insert_on_conflict_integration_spec.rb b/spec/staging_table/insert_on_conflict_integration_spec.rb new file mode 100644 index 0000000..d98c0d0 --- /dev/null +++ b/spec/staging_table/insert_on_conflict_integration_spec.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe "insert_on_conflict end-to-end" do + shared_examples "staging insert conflict resolution" do + before do + TestUser.delete_all + end + + it "applies :greatest/:sum/:new/:existing strategies across batches" do + StagingTable.stage( + TestUser, + include_indexes: true, + extra_columns: {priority: :integer, score: :integer}, + insert_on_conflict: { + target: [:email], + update: { + priority: :greatest, + score: :sum, + name: :new, + age: :existing + } + } + ) do |staging| + staging.insert([ + {email: "john@example.com", name: "John", age: 30, priority: 1, score: 10} + ]) + + staging.insert([ + {email: "john@example.com", name: "Johnny", age: 99, priority: 5, score: 20} + ]) + + record = staging.find_by(email: "john@example.com") + expect(record.priority).to eq(5) # greatest + expect(record.score).to eq(30) # sum + expect(record.name).to eq("Johnny") # new + expect(record.age).to eq(30) # existing (first write wins) + end + end + + it "emits DO NOTHING / no SET clause when every column is :existing" do + StagingTable.stage( + TestUser, + include_indexes: true, + insert_on_conflict: { + target: [:email], + update: {name: :existing} + } + ) do |staging| + staging.insert([{email: "a@example.com", name: "First"}]) + staging.insert([{email: "a@example.com", name: "Second"}]) + + record = staging.find_by(email: "a@example.com") + expect(record.name).to eq("First") + end + end + end + + context "with PostgreSQL", :postgresql do + include_examples "staging insert conflict resolution" + end + + # MySQL ON DUPLICATE KEY fires on any unique constraint, which matches the + # test_users email index — same behavior surface from the user's perspective. + context "with MySQL", :mysql do + include_examples "staging insert conflict resolution" + end + + context "with SQLite", :sqlite do + include_examples "staging insert conflict resolution" + end +end