From ae46b8f1fdf93e1db23100d88432c37245d130f3 Mon Sep 17 00:00:00 2001 From: Shibayan95 Date: Wed, 8 Apr 2026 12:06:55 +0530 Subject: [PATCH] Resolve conflict in cherry-pick of 5e35988b17aaa3af0d9dd27af33460b364a56485 and change the commit message --- server/lib/reverse_etl/loaders/standard.rb | 15 ++ .../lib/reverse_etl/loaders/standard_spec.rb | 208 ++++++++++++++++++ 2 files changed, 223 insertions(+) diff --git a/server/lib/reverse_etl/loaders/standard.rb b/server/lib/reverse_etl/loaders/standard.rb index e209c38ca..e19b8a05e 100644 --- a/server/lib/reverse_etl/loaders/standard.rb +++ b/server/lib/reverse_etl/loaders/standard.rb @@ -61,6 +61,7 @@ def process_batch_records(sync_run, sync, sync_config, activity) transformer = Transformers::UserMapping.new client = sync.destination.connector_client.new batch_size = sync_config.stream.batch_size +<<<<<<< HEAD # track sync record status successfull_sync_records = [] @@ -84,6 +85,20 @@ def process_batch_records(sync_run, sync, sync_config, activity) end update_sync_records_status(sync_run, successfull_sync_records, failed_sync_records) heartbeat(activity, sync_run) +======= + + sync_run.sync_records.pending.find_in_batches(batch_size:).each_slice(THREAD_COUNT) do |batch_of_sync_records| + mutex = Mutex.new + successful_sync_records = [] + failed_sync_records = [] + Parallel.each(batch_of_sync_records, in_threads: THREAD_COUNT) do |sync_records| + process_single_batch(sync, sync_run, sync_config, sync_records, + mutex, successful_sync_records, failed_sync_records) + end + update_sync_records_status(sync_run, successful_sync_records, failed_sync_records) + heartbeat(activity, sync_run) + end +>>>>>>> 5e35988b1 (fix(CE): parallelism getting stuck at batch upserting (#1799)) end def handle_response(report, sync_run) diff --git a/server/spec/lib/reverse_etl/loaders/standard_spec.rb b/server/spec/lib/reverse_etl/loaders/standard_spec.rb index 439aa7704..a6fc22b31 100644 --- a/server/spec/lib/reverse_etl/loaders/standard_spec.rb +++ b/server/spec/lib/reverse_etl/loaders/standard_spec.rb @@ -335,6 +335,214 @@ end end +<<<<<<< HEAD +======= + context "when individual record processing hits StandardError" do + let(:transformer) { ReverseEtl::Transformers::UserMapping.new } + let(:transform) { transformer.transform(sync_individual, sync_record_individual) } + let(:client) { instance_double(sync_individual.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end + + it "marks sync_record as failed" do + sync_config = sync_individual.to_protocol + sync_config.sync_run_id = sync_run_individual.id.to_s + + allow(sync_individual.destination.connector_client).to receive(:new).and_return(client) + allow(client).to receive(:write).with(sync_config, [transform], + "destination_insert").and_raise(StandardError.new("write error")) + expect(subject).to receive(:heartbeat).once.with(activity, sync_run_individual) + subject.write(sync_run_individual.id, activity) + sync_record_individual.reload + expect(sync_record_individual.status).to eq("failed") + expect(sync_record_individual.logs).to eq({ "error" => "write error" }) + end + end + + context "when individual record processing hits ActiveRecord::RecordNotUnique" do + let(:transformer) { ReverseEtl::Transformers::UserMapping.new } + let(:transform) { transformer.transform(sync_individual, sync_record_individual) } + let(:client) { instance_double(sync_individual.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end + + it "marks sync_record as failed with unique violation" do + sync_config = sync_individual.to_protocol + sync_config.sync_run_id = sync_run_individual.id.to_s + + allow(sync_individual.destination.connector_client).to receive(:new).and_return(client) + allow(client).to receive(:write) + .with(sync_config, [transform], "destination_insert") + .and_raise(ActiveRecord::RecordNotUnique.new("duplicate key")) + expect(subject).to receive(:heartbeat).once.with(activity, sync_run_individual) + subject.write(sync_run_individual.id, activity) + sync_record_individual.reload + expect(sync_record_individual.status).to eq("failed") + end + end + + context "when individual record processing creates per-thread clients" do + let(:transformer) { ReverseEtl::Transformers::UserMapping.new } + let(:transform) { transformer.transform(sync_individual, sync_record_individual) } + let(:client) { instance_double(sync_individual.destination.connector_client) } + let(:tracker) do + Multiwoven::Integrations::Protocol::TrackingMessage.new(success: 1, failed: 0) + end + let(:multiwoven_message) { tracker.to_multiwoven_message } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end + + it "calls connector_client.new per thread, not shared" do + sync_config = sync_individual.to_protocol + sync_config.sync_run_id = sync_run_individual.id.to_s + + expect(sync_individual.destination.connector_client).to receive(:new).at_least(:once).and_return(client) + allow(client).to receive(:write).and_return(multiwoven_message) + subject.write(sync_run_individual.id, activity) + end + end + + context "when individual record processing cleans up client" do + let(:transformer) { ReverseEtl::Transformers::UserMapping.new } + let(:transform) { transformer.transform(sync_individual, sync_record_individual) } + let(:client) { double("client", connector_spec:, close: nil) } + + it "closes client even when error occurs" do + sync_config = sync_individual.to_protocol + sync_config.sync_run_id = sync_run_individual.id.to_s + + allow(sync_individual.destination.connector_client).to receive(:new).and_return(client) + allow(client).to receive(:write).and_raise(StandardError.new("boom")) + expect(client).to receive(:close).at_least(:once) + subject.write(sync_run_individual.id, activity) + end + end + + context "when batch processing hits StandardError" do + let(:transformer) { ReverseEtl::Transformers::UserMapping.new } + let(:transform) do + [transformer.transform(sync_batch, sync_record_batch1), transformer.transform(sync_batch, sync_record_batch2)] + end + let(:client) { instance_double(sync_batch.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end + + it "marks all batch records as failed" do + sync_config = sync_batch.to_protocol + sync_config.sync_run_id = sync_run_batch.id.to_s + + allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) + allow(client).to receive(:write).with(sync_config, transform).and_raise(StandardError.new("batch error")) + expect(subject).to receive(:heartbeat).once.with(activity, sync_run_batch) + subject.write(sync_run_batch.id, activity) + sync_run_batch.sync_records.reload.each do |sync_record| + expect(sync_record.status).to eq("failed") + end + end + end + + context "when batch processing hits ActiveRecord::RecordNotUnique" do + let(:transformer) { ReverseEtl::Transformers::UserMapping.new } + let(:transform) do + [transformer.transform(sync_batch, sync_record_batch1), transformer.transform(sync_batch, sync_record_batch2)] + end + let(:client) { instance_double(sync_batch.destination.connector_client) } + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + end + + it "marks all batch records as failed" do + sync_config = sync_batch.to_protocol + sync_config.sync_run_id = sync_run_batch.id.to_s + + allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) + allow(client).to receive(:write) + .with(sync_config, transform).and_raise(ActiveRecord::RecordNotUnique.new("duplicate")) + expect(subject).to receive(:heartbeat).once.with(activity, sync_run_batch) + subject.write(sync_run_batch.id, activity) + sync_run_batch.sync_records.reload.each do |sync_record| + expect(sync_record.status).to eq("failed") + end + end + end + + context "when batch processing cleans up client" do + let(:transformer) { ReverseEtl::Transformers::UserMapping.new } + let(:transform) do + [transformer.transform(sync_batch, sync_record_batch1), transformer.transform(sync_batch, sync_record_batch2)] + end + let(:client) { double("client", connector_spec:, close: nil) } + + it "closes client even when error occurs" do + sync_config = sync_batch.to_protocol + sync_config.sync_run_id = sync_run_batch.id.to_s + + allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) + allow(client).to receive(:write).and_raise(StandardError.new("boom")) + expect(client).to receive(:close).once + subject.write(sync_run_batch.id, activity) + end + end + + context "when batch support is enabled and records span multiple batch groups" do + tracker = Multiwoven::Integrations::Protocol::TrackingMessage.new( + success: 10, + failed: 0 + ) + let(:multiwoven_message) { tracker.to_multiwoven_message } + let(:client) { instance_double(sync_batch.destination.connector_client) } + + before do + allow(client).to receive(:connector_spec).and_return(connector_spec) + # 2 records already exist (sync_record_batch1, sync_record_batch2). + # To get 2 each_slice groups we need > THREAD_COUNT batches (batch_size=10). + # Formula: THREAD_COUNT * 10 - 2 + 1 extra records → THREAD_COUNT+1 batches → 2 groups. + # Works regardless of SYNC_LOADER_THREAD_POOL_SIZE env value. + thread_count = described_class::THREAD_COUNT + extra_records = (thread_count * 10) - 2 + 1 + extra_records.times do |i| + create(:sync_record, sync: sync_batch, sync_run: sync_run_batch, primary_key: "extra_#{i}") + end + end + + it "calls heartbeat once per batch group, not once total" do + sync_config = sync_batch.to_protocol + sync_config.sync_run_id = sync_run_batch.id.to_s + allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) + allow(client).to receive(:write).and_return(multiwoven_message) + + expect(subject).to receive(:heartbeat).twice.with(activity, sync_run_batch) + subject.write(sync_run_batch.id, activity) + end + + it "calls update_sync_records_status once per batch group" do + sync_config = sync_batch.to_protocol + sync_config.sync_run_id = sync_run_batch.id.to_s + allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) + allow(client).to receive(:write).and_return(multiwoven_message) + + expect(subject).to receive(:update_sync_records_status).twice.and_call_original + subject.write(sync_run_batch.id, activity) + end + + it "marks all records across all groups as success" do + sync_config = sync_batch.to_protocol + sync_config.sync_run_id = sync_run_batch.id.to_s + allow(sync_batch.destination.connector_client).to receive(:new).and_return(client) + allow(client).to receive(:write).and_return(multiwoven_message) + + subject.write(sync_run_batch.id, activity) + sync_run_batch.sync_records.reload.each do |sync_record| + expect(sync_record.status).to eq("success") + end + end + end + +>>>>>>> 5e35988b1 (fix(CE): parallelism getting stuck at batch upserting (#1799)) context "when the report has tracking logs with a message" do let(:log_message) { '{"request":"Sample log message"}' } let(:report) do