From 52ae45ead177fa44af9bd68071a291bfe4401e1b Mon Sep 17 00:00:00 2001 From: Praveenkumar76 Date: Thu, 23 Apr 2026 18:25:25 +0530 Subject: [PATCH 1/5] [fix][io] Fix Solr Sink Debezium KeyValue unwrapping by bypassing generic schema API and extracting JSON payloads --- .../pulsar/io/solr/SolrAbstractSink.java | 11 +- .../pulsar/io/solr/SolrGenericRecordSink.java | 116 ++++++++++++++++-- .../apache/pulsar/io/solr/SolrSinkConfig.java | 7 ++ .../io/solr/SolrGenericRecordSinkTest.java | 91 ++++++++++++++ 4 files changed, 215 insertions(+), 10 deletions(-) diff --git a/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java b/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java index 202c782c14..9a01bd4b80 100644 --- a/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java +++ b/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java @@ -42,7 +42,7 @@ @Slf4j public abstract class SolrAbstractSink implements Sink { - private SolrSinkConfig solrSinkConfig; + protected SolrSinkConfig solrSinkConfig; private SolrClient client; private boolean enableBasicAuth; @@ -78,6 +78,11 @@ public void write(Record record) { } SolrInputDocument document = convert(record); + if (document == null) { + log.error("Failed to convert record: {}", record); + record.ack(); + return; + } updateRequest.add(document); try { @@ -103,6 +108,10 @@ public void close() throws Exception { // convert record as a Solr document public abstract SolrInputDocument convert(Record message); + protected SolrClient getSolrClient() { + return client; + } + public static SolrClient getClient(SolrMode solrMode, String url) { SolrClient solrClient = null; if (solrMode.equals(SolrMode.STANDALONE)) { diff --git a/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java b/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java index bbf863839c..17259d86aa 100644 --- a/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java +++ b/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; @@ -39,14 +40,111 @@ @Slf4j public class SolrGenericRecordSink extends SolrAbstractSink { @Override - public SolrInputDocument convert(Record message) { - SolrInputDocument doc = new SolrInputDocument(); - GenericRecord record = message.getValue(); - List fields = record.getFields(); - for (Field field : fields) { - Object fieldValue = record.getField(field); - doc.setField(field.getName(), fieldValue); - } - return doc; + public SolrInputDocument convert(Record pulsarRecord) { + SolrInputDocument solrDocument = new SolrInputDocument(); + GenericRecord messageValue = pulsarRecord.getValue(); + if (solrSinkConfig != null && solrSinkConfig.isUnwrapDebeziumRecord()) { + return mapDebeziumPayload(messageValue, solrDocument); + } + + // Default mapping for non-CDC messages + for (Field recordField : messageValue.getFields()) { + Object fieldValue = messageValue.getField(recordField); + if (fieldValue != null) { + solrDocument.setField(recordField.getName(), fieldValue); + } + } + return solrDocument; + } + + private SolrInputDocument mapDebeziumPayload(GenericRecord rootRecord, SolrInputDocument solrDocument) { + try { + GenericRecord payloadRecord = extractValueRecord(rootRecord); + + if (containsAfterField(payloadRecord)) { + payloadRecord = extractAfterRecord(payloadRecord, solrDocument); + if (payloadRecord == null) { + return solrDocument; + } + } + populateSolrFields(payloadRecord, solrDocument); + return solrDocument; + } catch (Exception ex) { + log.error("Debezium record processing failed, returning empty Solr document", ex); + return solrDocument; + } + } + + private GenericRecord extractValueRecord(GenericRecord rootRecord) { + Object nativePayload = rootRecord.getNativeObject(); + if (nativePayload instanceof KeyValue) { + Object valuePart = ((KeyValue) nativePayload).getValue(); + if (valuePart instanceof GenericRecord) { + log.debug("Detected KeyValue wrapper, extracting value section"); + return (GenericRecord) valuePart; + } + } + return rootRecord; + } + + private boolean containsAfterField(GenericRecord record) { + for (Field field : record.getFields()) { + if ("after".equals(field.getName())) { + return true; + } + } + return false; + } + + private GenericRecord extractAfterRecord(GenericRecord envelopeRecord, SolrInputDocument solrDocument) { + Object afterField = envelopeRecord.getField("after"); + if (afterField == null) { + log.info("Debezium DELETE event detected, Processing deletion"); + + Object beforeField = envelopeRecord.getField("before"); + if (beforeField instanceof GenericRecord) { + GenericRecord beforeRecord = (GenericRecord) beforeField; + Object id = beforeRecord.getField("id"); + + if (id != null) { + try { + int commitWithinMs = (solrSinkConfig != null && solrSinkConfig.getSolrCommitWithinMs() > 0) + ? solrSinkConfig.getSolrCommitWithinMs() : 1000; + getSolrClient().deleteById(String.valueOf(id), commitWithinMs); + log.info("Successfully issued delete to Solr for id={} with commitWithinMs={}", + id, commitWithinMs); + } catch (Exception e) { + log.error("Failed to delete document from Solr for id={}", id, e); + } + } else { + log.warn("DELETE event received, but 'id' field was missing or null in the 'before' record."); + } + } else { + log.warn("DELETE event received, but 'before' field is not a GenericRecord."); + } + return null; + } + if (afterField instanceof GenericRecord) { + log.debug("Debezium envelope detected, extracting 'after' payload"); + return (GenericRecord) afterField; + } + return envelopeRecord; + } + + private void populateSolrFields(GenericRecord dataRecord, SolrInputDocument solrDocument) { + for (Field field : dataRecord.getFields()) { + Object rawValue = dataRecord.getField(field); + if (rawValue == null || rawValue instanceof GenericRecord) { + continue; + } + solrDocument.setField(field.getName(), normalizeValue(rawValue)); + } + } + + private Object normalizeValue(Object value) { + if (value instanceof Integer || value instanceof Long) { + return String.valueOf(value); + } + return value; } } diff --git a/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java b/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java index daa93a366b..5a889e6221 100644 --- a/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java +++ b/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java @@ -81,6 +81,13 @@ public class SolrSinkConfig implements Serializable { help = "The password to use for basic authentication") private String password; + @FieldDoc( + required = false, + defaultValue = "false", + help = "If true, the sink will attempt to extract the nested 'after' field from CDC/Debezium records." + ) + private boolean unwrapDebeziumRecord = false; + public static SolrSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), SolrSinkConfig.class); diff --git a/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java index 9544f63977..64efa597a5 100644 --- a/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java +++ b/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java @@ -20,11 +20,13 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.impl.MessageImpl; @@ -32,8 +34,11 @@ import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.solr.common.SolrInputDocument; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -109,4 +114,90 @@ public void testOpenAndWriteSink() throws Exception { // open should success sink.open(configs, null); } + + @Test + @SuppressWarnings("unchecked") + public void testDebeziumUnwrapEnvelope() throws Exception { + SolrGenericRecordSink sink = new SolrGenericRecordSink(); + Map configs = new HashMap<>(); + configs.put("solrUrl", "http://localhost:8983/solr"); + configs.put("solrMode", "Standalone"); + configs.put("solrCollection", "techproducts"); + configs.put("solrCommitWithinMs", "100"); + configs.put("username", ""); + configs.put("password", ""); + configs.put("unwrapDebeziumRecord", true); + sink.open(configs, null); + + // Build root record wrapping a KeyValue payload + GenericRecord mockRootRecord = mock(GenericRecord.class); + Record mockMessage = mock(Record.class); + when(mockMessage.getValue()).thenReturn(mockRootRecord); + + GenericRecord mockValueRecord = mock(GenericRecord.class); + when(mockRootRecord.getNativeObject()).thenReturn(new KeyValue<>("key", mockValueRecord)); + // containsAfterField() iterates getFields() — must include "after" field here + Field afterSchemaField = mock(Field.class); + when(afterSchemaField.getName()).thenReturn("after"); + when(mockValueRecord.getFields()).thenReturn(Arrays.asList(afterSchemaField)); + + // extractAfterRecord() calls getField("after") by string — return nested record + GenericRecord mockAfterRecord = mock(GenericRecord.class); + when(mockValueRecord.getField("after")).thenReturn(mockAfterRecord); + + // Fields inside the "after" payload to be mapped into Solr document + Field idField = mock(Field.class); + when(idField.getName()).thenReturn("id"); + Field userIdField = mock(Field.class); + when(userIdField.getName()).thenReturn("user_id"); + when(mockAfterRecord.getFields()).thenReturn(Arrays.asList(idField, userIdField)); + when(mockAfterRecord.getField(idField)).thenReturn(100); + when(mockAfterRecord.getField(userIdField)).thenReturn(999); + + SolrInputDocument doc = sink.convert(mockMessage); + + Assert.assertNotNull(doc, "Document should not be null for envelope with 'after' field"); + Assert.assertEquals(doc.getFieldValue("id"), "100"); + Assert.assertEquals(doc.getFieldValue("user_id"), "999"); + } + + @Test + @SuppressWarnings("unchecked") + public void testDebeziumUnwrapFlatValue() throws Exception { + SolrGenericRecordSink sink = new SolrGenericRecordSink(); + Map configs = new HashMap<>(); + configs.put("solrUrl", "http://localhost:8983/solr"); + configs.put("solrMode", "Standalone"); + configs.put("solrCollection", "techproducts"); + configs.put("solrCommitWithinMs", "100"); + configs.put("username", ""); + configs.put("password", ""); + configs.put("unwrapDebeziumRecord", true); + sink.open(configs, null); + + // Build root record wrapping a KeyValue payload + GenericRecord mockRootRecord = mock(GenericRecord.class); + Record mockMessage = mock(Record.class); + when(mockMessage.getValue()).thenReturn(mockRootRecord); + + GenericRecord mockValueRecord = mock(GenericRecord.class); + when(mockRootRecord.getNativeObject()).thenReturn(new KeyValue<>("key", mockValueRecord)); + + // containsAfterField() will iterate these — no "after" field, so flat path is taken + Field idField = mock(Field.class); + when(idField.getName()).thenReturn("id"); + Field profileIdField = mock(Field.class); + when(profileIdField.getName()).thenReturn("profile_id"); + when(mockValueRecord.getFields()).thenReturn(Arrays.asList(idField, profileIdField)); + + // populateSolrFields() reads values by Field object reference + when(mockValueRecord.getField(idField)).thenReturn(200); + when(mockValueRecord.getField(profileIdField)).thenReturn(801); + SolrInputDocument doc = sink.convert(mockMessage); + + Assert.assertNotNull(doc, "Document should not be null for flat value record"); + Assert.assertEquals(doc.getFieldValue("id"), "200"); + Assert.assertEquals(doc.getFieldValue("profile_id"), "801"); + Assert.assertNull(doc.getFieldValue("user_id"), "user_id was not provided, should be null"); + } } From 37d5cfb6f8ddaf45046076263385c403bb1580a0 Mon Sep 17 00:00:00 2001 From: Praveenkumar76 Date: Fri, 24 Apr 2026 12:39:16 +0530 Subject: [PATCH 2/5] DRY violation, avoid log spam, return null on mapping failure, and remove hardcoded id by dynamically extracting primary key --- .../pulsar/io/solr/SolrGenericRecordSink.java | 51 +++++++++++-------- .../io/solr/SolrGenericRecordSinkTest.java | 39 +++++--------- 2 files changed, 42 insertions(+), 48 deletions(-) diff --git a/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java b/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java index 17259d86aa..c33da03a0c 100644 --- a/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java +++ b/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java @@ -47,13 +47,8 @@ public SolrInputDocument convert(Record pulsarRecord) { return mapDebeziumPayload(messageValue, solrDocument); } - // Default mapping for non-CDC messages - for (Field recordField : messageValue.getFields()) { - Object fieldValue = messageValue.getField(recordField); - if (fieldValue != null) { - solrDocument.setField(recordField.getName(), fieldValue); - } - } + // Default mapping for non-CDC messages now uses the same safe population logic + populateSolrFields(messageValue, solrDocument); return solrDocument; } @@ -64,14 +59,14 @@ private SolrInputDocument mapDebeziumPayload(GenericRecord rootRecord, SolrInput if (containsAfterField(payloadRecord)) { payloadRecord = extractAfterRecord(payloadRecord, solrDocument); if (payloadRecord == null) { - return solrDocument; + return null; } } populateSolrFields(payloadRecord, solrDocument); return solrDocument; } catch (Exception ex) { log.error("Debezium record processing failed, returning empty Solr document", ex); - return solrDocument; + return null; } } @@ -98,36 +93,48 @@ private boolean containsAfterField(GenericRecord record) { private GenericRecord extractAfterRecord(GenericRecord envelopeRecord, SolrInputDocument solrDocument) { Object afterField = envelopeRecord.getField("after"); + if (afterField == null) { - log.info("Debezium DELETE event detected, Processing deletion"); + log.debug("Debezium DELETE event detected, Processing deletion"); Object beforeField = envelopeRecord.getField("before"); if (beforeField instanceof GenericRecord) { GenericRecord beforeRecord = (GenericRecord) beforeField; - Object id = beforeRecord.getField("id"); - - if (id != null) { - try { - int commitWithinMs = (solrSinkConfig != null && solrSinkConfig.getSolrCommitWithinMs() > 0) - ? solrSinkConfig.getSolrCommitWithinMs() : 1000; - getSolrClient().deleteById(String.valueOf(id), commitWithinMs); - log.info("Successfully issued delete to Solr for id={} with commitWithinMs={}", - id, commitWithinMs); - } catch (Exception e) { - log.error("Failed to delete document from Solr for id={}", id, e); + + // Dynamically extract the primary key + List fields = beforeRecord.getFields(); + if (!fields.isEmpty()) { + Object id = beforeRecord.getField(fields.get(0)); + + if (id != null) { + try { + int commitWithinMs = (solrSinkConfig != null && solrSinkConfig.getSolrCommitWithinMs() > 0) + ? solrSinkConfig.getSolrCommitWithinMs() : 1000; + + getSolrClient().deleteById(String.valueOf(id), commitWithinMs); + log.debug("Successfully issued delete to Solr for id={} with commitWithinMs={}", + id, commitWithinMs); + } catch (Exception e) { + log.error("Failed to delete document from Solr for id={}", id, e); + } + } else { + log.warn("DELETE event received, but primary key field was null."); } } else { - log.warn("DELETE event received, but 'id' field was missing or null in the 'before' record."); + log.warn("DELETE event received, but 'before' record had no fields."); } } else { log.warn("DELETE event received, but 'before' field is not a GenericRecord."); } + return null; } + if (afterField instanceof GenericRecord) { log.debug("Debezium envelope detected, extracting 'after' payload"); return (GenericRecord) afterField; } + return envelopeRecord; } diff --git a/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java index 64efa597a5..4e700cfbfa 100644 --- a/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java +++ b/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java @@ -51,6 +51,8 @@ public class SolrGenericRecordSinkTest { private SolrServerUtil solrServerUtil; private Message message; + private SolrGenericRecordSink sink; + private Map configs; /** * A Simple class to test solr class. @@ -65,6 +67,14 @@ public static class Foo { public void setUp() throws Exception { solrServerUtil = new SolrServerUtil(8983); solrServerUtil.startStandaloneSolr(); + sink = new SolrGenericRecordSink(); + configs = new HashMap<>(); + configs.put("solrUrl", "http://localhost:8983/solr"); + configs.put("solrMode", "Standalone"); + configs.put("solrCollection", "techproducts"); + configs.put("solrCommitWithinMs", "100"); + configs.put("username", ""); + configs.put("password", ""); } @AfterMethod(alwaysRun = true) @@ -75,13 +85,6 @@ public void tearDown() throws Exception { @Test public void testOpenAndWriteSink() throws Exception { message = mock(MessageImpl.class); - Map configs = new HashMap<>(); - configs.put("solrUrl", "http://localhost:8983/solr"); - configs.put("solrMode", "Standalone"); - configs.put("solrCollection", "techproducts"); - configs.put("solrCommitWithinMs", "100"); - configs.put("username", ""); - configs.put("password", ""); GenericSchema genericAvroSchema; SolrGenericRecordSink sink = new SolrGenericRecordSink(); @@ -118,14 +121,6 @@ public void testOpenAndWriteSink() throws Exception { @Test @SuppressWarnings("unchecked") public void testDebeziumUnwrapEnvelope() throws Exception { - SolrGenericRecordSink sink = new SolrGenericRecordSink(); - Map configs = new HashMap<>(); - configs.put("solrUrl", "http://localhost:8983/solr"); - configs.put("solrMode", "Standalone"); - configs.put("solrCollection", "techproducts"); - configs.put("solrCommitWithinMs", "100"); - configs.put("username", ""); - configs.put("password", ""); configs.put("unwrapDebeziumRecord", true); sink.open(configs, null); @@ -136,12 +131,12 @@ public void testDebeziumUnwrapEnvelope() throws Exception { GenericRecord mockValueRecord = mock(GenericRecord.class); when(mockRootRecord.getNativeObject()).thenReturn(new KeyValue<>("key", mockValueRecord)); - // containsAfterField() iterates getFields() — must include "after" field here + // containsAfterField() iterates getFields() - must include "after" field here Field afterSchemaField = mock(Field.class); when(afterSchemaField.getName()).thenReturn("after"); when(mockValueRecord.getFields()).thenReturn(Arrays.asList(afterSchemaField)); - // extractAfterRecord() calls getField("after") by string — return nested record + // extractAfterRecord() calls getField("after") by string - return nested record GenericRecord mockAfterRecord = mock(GenericRecord.class); when(mockValueRecord.getField("after")).thenReturn(mockAfterRecord); @@ -164,14 +159,6 @@ public void testDebeziumUnwrapEnvelope() throws Exception { @Test @SuppressWarnings("unchecked") public void testDebeziumUnwrapFlatValue() throws Exception { - SolrGenericRecordSink sink = new SolrGenericRecordSink(); - Map configs = new HashMap<>(); - configs.put("solrUrl", "http://localhost:8983/solr"); - configs.put("solrMode", "Standalone"); - configs.put("solrCollection", "techproducts"); - configs.put("solrCommitWithinMs", "100"); - configs.put("username", ""); - configs.put("password", ""); configs.put("unwrapDebeziumRecord", true); sink.open(configs, null); @@ -183,7 +170,7 @@ public void testDebeziumUnwrapFlatValue() throws Exception { GenericRecord mockValueRecord = mock(GenericRecord.class); when(mockRootRecord.getNativeObject()).thenReturn(new KeyValue<>("key", mockValueRecord)); - // containsAfterField() will iterate these — no "after" field, so flat path is taken + // containsAfterField() will iterate these, no "after" field, so flat path is taken Field idField = mock(Field.class); when(idField.getName()).thenReturn("id"); Field profileIdField = mock(Field.class); From b010fd2c4bf334f1213bf29420194b2b4a484b2f Mon Sep 17 00:00:00 2001 From: Praveenkumar76 Date: Thu, 30 Apr 2026 17:48:50 +0530 Subject: [PATCH 3/5] fix: prevent data loss by using record.fail() on conversion errors and improve CDC detection by validating Debezium envelope (op + before/after) --- .../pulsar/io/solr/SolrAbstractSink.java | 16 ++++++---- .../pulsar/io/solr/SolrGenericRecordSink.java | 30 ++++++++++++++----- .../io/solr/SolrGenericRecordSinkTest.java | 4 ++- 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java b/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java index 9a01bd4b80..af7e70229f 100644 --- a/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java +++ b/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java @@ -77,13 +77,19 @@ public void write(Record record) { ); } - SolrInputDocument document = convert(record); - if (document == null) { - log.error("Failed to convert record: {}", record); - record.ack(); + try { + SolrInputDocument document = convert(record); + if (document == null) { + // It was a DELETE event or a skip, Acknowledge it so it isn't retried + record.ack(); + return; + } + updateRequest.add(document); + } catch (Exception e) { + log.error("Failed to convert record: {}", record, e); + record.fail(); return; } - updateRequest.add(document); try { UpdateResponse updateResponse = updateRequest.process(client, solrSinkConfig.getSolrCollection()); diff --git a/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java b/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java index c33da03a0c..66c2e2c64e 100644 --- a/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java +++ b/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java @@ -56,7 +56,7 @@ private SolrInputDocument mapDebeziumPayload(GenericRecord rootRecord, SolrInput try { GenericRecord payloadRecord = extractValueRecord(rootRecord); - if (containsAfterField(payloadRecord)) { + if (isDebeziumEnvelope(payloadRecord)) { payloadRecord = extractAfterRecord(payloadRecord, solrDocument); if (payloadRecord == null) { return null; @@ -65,8 +65,8 @@ private SolrInputDocument mapDebeziumPayload(GenericRecord rootRecord, SolrInput populateSolrFields(payloadRecord, solrDocument); return solrDocument; } catch (Exception ex) { - log.error("Debezium record processing failed, returning empty Solr document", ex); - return null; + log.error("Debezium record processing failed", ex); + throw new RuntimeException("Failed to extract Debezium payload", ex); } } @@ -82,13 +82,27 @@ private GenericRecord extractValueRecord(GenericRecord rootRecord) { return rootRecord; } - private boolean containsAfterField(GenericRecord record) { + /** + * Identifies if the record is a Debezium CDC envelope. + * Debezium envelopes signify state changes and contain specific metadata fields. + * The "after" field signifies the newest state of the database row after an insert/update. + * We check for the presence of standard CDC fields to differentiate a true envelope + * from a normal database table that simply happens to have a column named "after". + */ + private boolean isDebeziumEnvelope(GenericRecord record) { + boolean hasAfter = false; + boolean hasBefore = false; + boolean hasOp = false; + for (Field field : record.getFields()) { - if ("after".equals(field.getName())) { - return true; - } + String fieldName = field.getName(); + if ("after".equals(fieldName)) hasAfter = true; + else if ("before".equals(fieldName)) hasBefore = true; + else if ("op".equals(fieldName)) hasOp = true; } - return false; + + // A CDC envelope will contain these Debezium operational fields + return (hasAfter || hasBefore) && hasOp; } private GenericRecord extractAfterRecord(GenericRecord envelopeRecord, SolrInputDocument solrDocument) { diff --git a/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java index 4e700cfbfa..6cd9fd9410 100644 --- a/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java +++ b/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java @@ -134,7 +134,9 @@ public void testDebeziumUnwrapEnvelope() throws Exception { // containsAfterField() iterates getFields() - must include "after" field here Field afterSchemaField = mock(Field.class); when(afterSchemaField.getName()).thenReturn("after"); - when(mockValueRecord.getFields()).thenReturn(Arrays.asList(afterSchemaField)); + Field opField = mock(Field.class); + when(opField.getName()).thenReturn("op"); + when(mockValueRecord.getFields()).thenReturn(Arrays.asList(afterSchemaField, opField)); // extractAfterRecord() calls getField("after") by string - return nested record GenericRecord mockAfterRecord = mock(GenericRecord.class); From 4bffdfdc33bc809c319ffc848581ccb157ce2a9d Mon Sep 17 00:00:00 2001 From: Praveenkumar76 Date: Wed, 6 May 2026 14:11:17 +0530 Subject: [PATCH 4/5] refactor the nested if-else statement --- .../pulsar/io/solr/SolrGenericRecordSink.java | 122 ++++++++++++------ 1 file changed, 79 insertions(+), 43 deletions(-) diff --git a/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java b/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java index 66c2e2c64e..00fc425a3e 100644 --- a/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java +++ b/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java @@ -39,10 +39,16 @@ ) @Slf4j public class SolrGenericRecordSink extends SolrAbstractSink { + + /** + * Entry point for conversion. Determines if the record requires CDC unwrapping + * based on user configuration. + */ @Override public SolrInputDocument convert(Record pulsarRecord) { SolrInputDocument solrDocument = new SolrInputDocument(); GenericRecord messageValue = pulsarRecord.getValue(); + if (solrSinkConfig != null && solrSinkConfig.isUnwrapDebeziumRecord()) { return mapDebeziumPayload(messageValue, solrDocument); } @@ -52,16 +58,22 @@ public SolrInputDocument convert(Record pulsarRecord) { return solrDocument; } + /** + * Orchestrates the Debezium extraction process and ensures processing errors + * are thrown to trigger Pulsar's failure handling. + */ private SolrInputDocument mapDebeziumPayload(GenericRecord rootRecord, SolrInputDocument solrDocument) { try { GenericRecord payloadRecord = extractValueRecord(rootRecord); if (isDebeziumEnvelope(payloadRecord)) { payloadRecord = extractAfterRecord(payloadRecord, solrDocument); + // Return null if extractAfterRecord handled a DELETE or skipped the record if (payloadRecord == null) { return null; } } + populateSolrFields(payloadRecord, solrDocument); return solrDocument; } catch (Exception ex) { @@ -70,15 +82,23 @@ private SolrInputDocument mapDebeziumPayload(GenericRecord rootRecord, SolrInput } } + /** + * Handles Pulsar's KeyValue schema by checking the native object and + * extracting the 'Value' part of the pair. + */ private GenericRecord extractValueRecord(GenericRecord rootRecord) { Object nativePayload = rootRecord.getNativeObject(); - if (nativePayload instanceof KeyValue) { - Object valuePart = ((KeyValue) nativePayload).getValue(); - if (valuePart instanceof GenericRecord) { - log.debug("Detected KeyValue wrapper, extracting value section"); - return (GenericRecord) valuePart; - } + + if (!(nativePayload instanceof KeyValue)) { + return rootRecord; } + + Object valuePart = ((KeyValue) nativePayload).getValue(); + if (valuePart instanceof GenericRecord) { + log.debug("Detected KeyValue wrapper, extracting value section"); + return (GenericRecord) valuePart; + } + return rootRecord; } @@ -101,60 +121,72 @@ private boolean isDebeziumEnvelope(GenericRecord record) { else if ("op".equals(fieldName)) hasOp = true; } - // A CDC envelope will contain these Debezium operational fields + // A CDC envelope must strictly contain the operation code and a data state return (hasAfter || hasBefore) && hasOp; } + /** + * Separates the INSERT/UPDATE path from the DELETE path. + * Issues immediate deletions to Solr for DELETE events. + */ private GenericRecord extractAfterRecord(GenericRecord envelopeRecord, SolrInputDocument solrDocument) { Object afterField = envelopeRecord.getField("after"); - if (afterField == null) { - log.debug("Debezium DELETE event detected, Processing deletion"); - - Object beforeField = envelopeRecord.getField("before"); - if (beforeField instanceof GenericRecord) { - GenericRecord beforeRecord = (GenericRecord) beforeField; - - // Dynamically extract the primary key - List fields = beforeRecord.getFields(); - if (!fields.isEmpty()) { - Object id = beforeRecord.getField(fields.get(0)); - - if (id != null) { - try { - int commitWithinMs = (solrSinkConfig != null && solrSinkConfig.getSolrCommitWithinMs() > 0) - ? solrSinkConfig.getSolrCommitWithinMs() : 1000; - - getSolrClient().deleteById(String.valueOf(id), commitWithinMs); - log.debug("Successfully issued delete to Solr for id={} with commitWithinMs={}", - id, commitWithinMs); - } catch (Exception e) { - log.error("Failed to delete document from Solr for id={}", id, e); - } - } else { - log.warn("DELETE event received, but primary key field was null."); - } - } else { - log.warn("DELETE event received, but 'before' record had no fields."); - } - } else { - log.warn("DELETE event received, but 'before' field is not a GenericRecord."); - } + if (afterField != null) { + return (afterField instanceof GenericRecord) ? (GenericRecord) afterField : envelopeRecord; + } + + // Processing Debezium DELETE path (where 'after' is null) + log.debug("Debezium DELETE event detected, Processing deletion"); + Object beforeField = envelopeRecord.getField("before"); + if (!(beforeField instanceof GenericRecord)) { + log.warn("DELETE event received, but 'before' field is missing or invalid."); return null; } - if (afterField instanceof GenericRecord) { - log.debug("Debezium envelope detected, extracting 'after' payload"); - return (GenericRecord) afterField; + GenericRecord beforeRecord = (GenericRecord) beforeField; + List fields = beforeRecord.getFields(); + + if (fields.isEmpty()) { + log.warn("DELETE event received, but 'before' record has no fields to extract ID."); + return null; } - return envelopeRecord; + // Assumes index 0 is the Primary Key per standard Debezium behavior + Object id = beforeRecord.getField(fields.get(0)); + if (id == null) { + log.warn("DELETE event received, but primary key field value was null."); + return null; + } + + executeSolrDelete(id); + return null; + } + + /** + * Helper to issue delete commands to the SolrClient using configured commit settings. + */ + private void executeSolrDelete(Object id) { + try { + int commitWithinMs = (solrSinkConfig != null && solrSinkConfig.getSolrCommitWithinMs() > 0) + ? solrSinkConfig.getSolrCommitWithinMs() : 1000; + + getSolrClient().deleteById(String.valueOf(id), commitWithinMs); + log.debug("Successfully issued delete to Solr for id={} with commitWithinMs={}", id, commitWithinMs); + } catch (Exception e) { + log.error("Failed to delete document from Solr for id={}", id, e); + } } + /** + * Iterates through all fields in a GenericRecord to build the Solr document, + * applying type normalization to each value. + */ private void populateSolrFields(GenericRecord dataRecord, SolrInputDocument solrDocument) { for (Field field : dataRecord.getFields()) { Object rawValue = dataRecord.getField(field); + // Skip nulls and nested records as Solr requires flat field values if (rawValue == null || rawValue instanceof GenericRecord) { continue; } @@ -162,6 +194,10 @@ private void populateSolrFields(GenericRecord dataRecord, SolrInputDocument solr } } + /** + * Ensures numeric types are safely coerced to Strings to maintain + * compatibility with Solr's field requirements. + */ private Object normalizeValue(Object value) { if (value instanceof Integer || value instanceof Long) { return String.valueOf(value); From 1ce2adbfcbfced17c06d79ea3608a38296db08f3 Mon Sep 17 00:00:00 2001 From: Praveenkumar76 Date: Sat, 23 May 2026 13:41:59 +0530 Subject: [PATCH 5/5] Add the DELETE test above to confirm that your executeSolrDelete call is reached --- .../io/solr/SolrGenericRecordSinkTest.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java index 6cd9fd9410..0056cd8e47 100644 --- a/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java +++ b/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java @@ -189,4 +189,44 @@ public void testDebeziumUnwrapFlatValue() throws Exception { Assert.assertEquals(doc.getFieldValue("profile_id"), "801"); Assert.assertNull(doc.getFieldValue("user_id"), "user_id was not provided, should be null"); } + + @Test + @SuppressWarnings("unchecked") + public void testDebeziumUnwrapDelete() throws Exception { + configs.put("unwrapDebeziumRecord", true); + sink.open(configs, null); + + // Build root record + GenericRecord mockRootRecord = mock(GenericRecord.class); + Record mockMessage = mock(Record.class); + when(mockMessage.getValue()).thenReturn(mockRootRecord); + + GenericRecord mockValueRecord = mock(GenericRecord.class); + when(mockRootRecord.getNativeObject()).thenReturn(new KeyValue<>("key", mockValueRecord)); + + // Mock Debezium DELETE signature: 'after' is null, 'before' and 'op' are present + Field beforeSchemaField = mock(Field.class); + when(beforeSchemaField.getName()).thenReturn("before"); + Field opField = mock(Field.class); + when(opField.getName()).thenReturn("op"); + when(mockValueRecord.getFields()).thenReturn(Arrays.asList(beforeSchemaField, opField)); + + // 'after' is null for deletes + when(mockValueRecord.getField("after")).thenReturn(null); + + // Mock 'before' record containing the ID to be deleted + GenericRecord mockBeforeRecord = mock(GenericRecord.class); + when(mockValueRecord.getField("before")).thenReturn(mockBeforeRecord); + + Field idField = mock(Field.class); + when(idField.getName()).thenReturn("id"); + when(mockBeforeRecord.getFields()).thenReturn(Arrays.asList(idField)); + when(mockBeforeRecord.getField(idField)).thenReturn(500); + + // This should trigger executeSolrDelete and return null + SolrInputDocument doc = sink.convert(mockMessage); + + // In our refactored logic, a DELETE returns null so the parent class can ACK it + Assert.assertNull(doc, "Document should be null for DELETE events to trigger ACK"); + } }