Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ end
Don't let duplicates crash your party. Configure the transfer strategy to handle conflicts gracefully.

```ruby
StagingTable.stage(User,
StagingTable.stage(User,
transfer_strategy: :upsert, # Default is :insert
conflict_target: [:email], # Column(s) to check for conflicts
conflict_action: :update # :update (overwrite) or :ignore (skip)
Expand All @@ -112,6 +112,85 @@ StagingTable.stage(User,
end
```

### 📦 Extra Columns

Need columns in your staging table that don't exist in the source model? Perfect for tracking import metadata, priorities, or processing flags.

```ruby
StagingTable.stage(User,
extra_columns: {
priority: :integer, # Simple type
processed: {type: :boolean, default: false}, # With options
import_batch: {type: :string, default: "batch_1"}
}
) do |staging|
# Insert with extra column values
staging.insert([
{name: "John", email: "john@example.com", priority: 1},
{name: "Jane", email: "jane@example.com", priority: 2}
])

# Query using extra columns
staging.where(priority: 1).find_each do |record|
# Process high priority records first
end

# Mark as processed
staging.where(processed: false).update_all(processed: true)

# Extra columns are automatically excluded during transfer
end
```

**Supported column types:** `:string`, `:text`, `:integer`, `:bigint`, `:float`, `:decimal`, `:boolean`, `:datetime`, `:date`, `:time`, `:binary`, `:json`, `:jsonb` (PostgreSQL), `:uuid` (PostgreSQL)

### 🔀 Staging Insert Conflict Resolution

Handle conflicts when inserting into the staging table itself. Useful when you're aggregating data from multiple sources or processing the same records multiple times.

```ruby
StagingTable.stage(User,
extra_columns: {priority: :integer, score: :integer},
insert_on_conflict: {
target: [:email], # Column(s) to detect conflicts
update: {
priority: :greatest, # Keep the higher value
score: :sum, # Add values together
name: :new, # Use the incoming value
age: :existing # Keep the existing value (skip update)
}
}
) do |staging|
# First batch
staging.insert([
{email: "john@example.com", name: "John", priority: 1, score: 10}
])

# Second batch - conflicts are resolved automatically
staging.insert([
{email: "john@example.com", name: "Johnny", priority: 5, score: 20}
])

# Result: priority=5 (greatest), score=30 (sum), name="Johnny" (new)
end
```

**Available resolution strategies:**

| Strategy | Description | Example Result |
|----------|-------------|----------------|
| `:greatest` | Keep the larger value | `GREATEST(existing, incoming)` |
| `:least` | Keep the smaller value | `LEAST(existing, incoming)` |
| `:new` | Use the incoming value | Overwrites existing |
| `:existing` | Keep the existing value | Skips update for this column |
| `:sum` | Add values together | `existing + incoming` |
| `:coalesce` | Use incoming if not null, else existing | `COALESCE(incoming, existing)` |
| `"raw SQL"` | Custom SQL expression | `"COALESCE(excluded.col, staging.col)"` |

**Note:** For PostgreSQL and SQLite, `target` columns are used explicitly in the `ON CONFLICT` clause and must be covered by a unique or exclusion constraint.

For MySQL, `ON DUPLICATE KEY UPDATE` is triggered by *any* violated unique (or primary) key on the table — not just the ones listed in `:target`. If your staging table has multiple unique indexes, the update may fire on conflicts you did not intend. Make sure the column(s) you list in `:target` are the only unique constraint that matters for your use case, or remove any other unique indexes before inserting. Raw SQL strategies using `VALUES(col)` are deprecated since MySQL 8.0.20; the clause still works but may emit warnings.

### 📊 Transfer Results

Every transfer returns a `TransferResult` with detailed statistics:
Expand Down
10 changes: 10 additions & 0 deletions lib/staging_table.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "staging_table/transfer_result"
require "staging_table/session"
require "staging_table/model_factory"
require "staging_table/conflict_resolver"
require "staging_table/bulk_inserter"
require "staging_table/adapters/base"
require "staging_table/adapters/postgresql"
Expand All @@ -34,6 +35,15 @@ def configure
# @option options [Symbol] :transfer_strategy :insert or :upsert (default: :insert)
# @option options [Array<Symbol>] :conflict_target Columns for upsert conflict detection
# @option options [Symbol] :conflict_action :update or :ignore for upsert conflicts
# @option options [Hash] :extra_columns Additional columns to add to staging table only.
# Keys are column names, values are either a symbol type (:integer, :string, :boolean, etc.)
# or a hash with :type, :default, and :null options.
# Example: { priority: :integer, processed: { type: :boolean, default: false } }
# @option options [Hash] :insert_on_conflict Conflict resolution for staging table inserts.
# :target - Column(s) to detect conflicts on (Symbol or Array of Symbols)
# :update - Hash of column => strategy pairs. Strategies: :greatest, :least, :new,
# :existing, :sum, :coalesce, or a raw SQL string.
# Example: { target: [:external_id], update: { priority: :greatest, score: :least } }
# @option options [Proc] :before_insert Called before inserting into staging
# @option options [Proc] :after_insert Called after inserting into staging
# @option options [Proc] :before_transfer Called before transferring to target
Expand Down
79 changes: 78 additions & 1 deletion lib/staging_table/adapters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,91 @@ def self.for(connection)
case adapter_name
when /postgresql/
Postgresql.new(connection)
when /mysql/
when /mysql|trilogy/
Mysql.new(connection)
when /sqlite/
Sqlite.new(connection)
else
raise AdapterError, "Unsupported adapter: #{adapter_name}. StagingTable supports PostgreSQL, MySQL, and SQLite adapters."
end
end

protected

def add_extra_columns(table_name, extra_columns)
return if extra_columns.nil? || extra_columns.empty?

extra_columns.each do |column_name, column_spec|
add_column(table_name, column_name, column_spec)
end
end

def add_column(table_name, column_name, column_spec)
quoted_table = connection.quote_table_name(table_name)
quoted_column = connection.quote_column_name(column_name)
column_definition = parse_column_spec(column_spec)

sql = "ALTER TABLE #{quoted_table} ADD COLUMN #{quoted_column} #{column_definition}"
connection.execute(sql)
end

def parse_column_spec(spec)
if spec.is_a?(Symbol)
sql_type_for(spec)
elsif spec.is_a?(Hash)
type = sql_type_for(spec[:type])
parts = [type]
parts << "DEFAULT #{quote_default(spec[:default])}" if spec.key?(:default)
parts << "NOT NULL" if spec[:null] == false
parts.join(" ")
else
raise ConfigurationError, "Invalid column spec: #{spec.inspect}. Must be a symbol or hash with :type key."
end
end

def sql_type_for(type)
case type
when :string, :text
"TEXT"
when :integer
"INTEGER"
when :float
"REAL"
when :decimal
"DECIMAL"
when :boolean
"BOOLEAN"
when :datetime, :timestamp
"TIMESTAMP"
when :date
"DATE"
when :time
"TIME"
when :binary
"BLOB"
when :json, :jsonb
"TEXT"
else
raise ConfigurationError, "Unsupported column type: #{type.inspect}"
end
end

def quote_default(value)
case value
when nil
"NULL"
when true
"TRUE"
when false
"FALSE"
when Numeric
value.to_s
when String
connection.quote(value)
else
connection.quote(value.to_s)
end
end
end
end
end
46 changes: 46 additions & 0 deletions lib/staging_table/adapters/mysql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,52 @@ def create_table(temp_table_name, source_table_name, options = {})
quoted_temp = connection.quote_table_name(temp_table_name)
quoted_source = connection.quote_table_name(source_table_name)
connection.execute("CREATE TABLE #{quoted_temp} LIKE #{quoted_source}")

add_extra_columns(temp_table_name, options[:extra_columns])
end

protected

def sql_type_for(type)
case type
when :string
"VARCHAR(255)"
when :text
"TEXT"
when :integer
"INT"
when :bigint
"BIGINT"
when :float
"DOUBLE"
when :decimal
"DECIMAL"
when :boolean
"TINYINT(1)"
when :datetime, :timestamp
"DATETIME"
when :date
"DATE"
when :time
"TIME"
when :binary
"BLOB"
when :json
"JSON"
else
raise ConfigurationError, "Unsupported column type for MySQL: #{type.inspect}"
end
end

def quote_default(value)
case value
when true
"1"
when false
"0"
else
super
end
end
end
end
Expand Down
50 changes: 50 additions & 0 deletions lib/staging_table/adapters/postgresql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,56 @@ def create_table(temp_table_name, source_table_name, options = {})
sql += " INCLUDING INDEXES" if options[:include_indexes]
sql += ")"
connection.execute(sql)

add_extra_columns(temp_table_name, options[:extra_columns])
end

protected

def sql_type_for(type)
case type
when :string
"VARCHAR"
when :text
"TEXT"
when :integer
"INTEGER"
when :bigint
"BIGINT"
when :float
"DOUBLE PRECISION"
when :decimal
"DECIMAL"
when :boolean
"BOOLEAN"
when :datetime, :timestamp
"TIMESTAMP"
when :date
"DATE"
when :time
"TIME"
when :binary
"BYTEA"
when :json
"JSON"
when :jsonb
"JSONB"
when :uuid
"UUID"
else
raise ConfigurationError, "Unsupported column type for PostgreSQL: #{type.inspect}"
end
end

def quote_default(value)
case value
when true
"true"
when false
"false"
else
super
end
end
end
end
Expand Down
36 changes: 36 additions & 0 deletions lib/staging_table/adapters/sqlite.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,42 @@ def create_table(temp_table_name, source_table_name, options = {})
if options[:include_indexes]
copy_indexes(temp_table_name, source_table_name)
end

add_extra_columns(temp_table_name, options[:extra_columns])
end

protected

def sql_type_for(type)
case type
when :string, :text
"TEXT"
when :integer, :bigint
"INTEGER"
when :float, :decimal
"REAL"
when :boolean
"INTEGER"
when :datetime, :timestamp, :date, :time
"TEXT"
when :binary
"BLOB"
when :json, :jsonb
"TEXT"
else
raise ConfigurationError, "Unsupported column type for SQLite: #{type.inspect}"
end
end

def quote_default(value)
case value
when true
"1"
when false
"0"
else
super
end
end

private
Expand Down
6 changes: 4 additions & 2 deletions lib/staging_table/bulk_inserter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

module StagingTable
class BulkInserter
attr_reader :model, :batch_size
attr_reader :model, :batch_size, :conflict_resolver

def initialize(model, batch_size: 1000)
def initialize(model, batch_size: 1000, insert_on_conflict: nil)
@model = model
@batch_size = batch_size
@conflict_resolver = ConflictResolver.new(connection, insert_on_conflict)
end

def insert(records)
Expand All @@ -30,6 +31,7 @@ def insert(records)
end.join(", ")

sql = "INSERT INTO #{quoted_table} (#{quoted_columns}) VALUES #{values_list}"
sql += conflict_resolver.conflict_clause(model.table_name)
connection.execute(sql)
end
end
Expand Down
Loading
Loading