diff --git a/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java b/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java index 202c782c14..af7e70229f 100644 --- a/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java +++ b/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java @@ -42,7 +42,7 @@ @Slf4j public abstract class SolrAbstractSink implements Sink { - private SolrSinkConfig solrSinkConfig; + protected SolrSinkConfig solrSinkConfig; private SolrClient client; private boolean enableBasicAuth; @@ -77,8 +77,19 @@ public void write(Record record) { ); } - SolrInputDocument document = convert(record); - updateRequest.add(document); + try { + SolrInputDocument document = convert(record); + if (document == null) { + // It was a DELETE event or a skip, Acknowledge it so it isn't retried + record.ack(); + return; + } + updateRequest.add(document); + } catch (Exception e) { + log.error("Failed to convert record: {}", record, e); + record.fail(); + return; + } try { UpdateResponse updateResponse = updateRequest.process(client, solrSinkConfig.getSolrCollection()); @@ -103,6 +114,10 @@ public void close() throws Exception { // convert record as a Solr document public abstract SolrInputDocument convert(Record message); + protected SolrClient getSolrClient() { + return client; + } + public static SolrClient getClient(SolrMode solrMode, String url) { SolrClient solrClient = null; if (solrMode.equals(SolrMode.STANDALONE)) { diff --git a/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java b/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java index bbf863839c..00fc425a3e 100644 --- a/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java +++ b/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; @@ -38,15 +39,169 @@ ) @Slf4j public class SolrGenericRecordSink extends SolrAbstractSink { + + /** + * Entry point for conversion. Determines if the record requires CDC unwrapping + * based on user configuration. + */ @Override - public SolrInputDocument convert(Record message) { - SolrInputDocument doc = new SolrInputDocument(); - GenericRecord record = message.getValue(); - List fields = record.getFields(); - for (Field field : fields) { - Object fieldValue = record.getField(field); - doc.setField(field.getName(), fieldValue); - } - return doc; + public SolrInputDocument convert(Record pulsarRecord) { + SolrInputDocument solrDocument = new SolrInputDocument(); + GenericRecord messageValue = pulsarRecord.getValue(); + + if (solrSinkConfig != null && solrSinkConfig.isUnwrapDebeziumRecord()) { + return mapDebeziumPayload(messageValue, solrDocument); + } + + // Default mapping for non-CDC messages now uses the same safe population logic + populateSolrFields(messageValue, solrDocument); + return solrDocument; + } + + /** + * Orchestrates the Debezium extraction process and ensures processing errors + * are thrown to trigger Pulsar's failure handling. + */ + private SolrInputDocument mapDebeziumPayload(GenericRecord rootRecord, SolrInputDocument solrDocument) { + try { + GenericRecord payloadRecord = extractValueRecord(rootRecord); + + if (isDebeziumEnvelope(payloadRecord)) { + payloadRecord = extractAfterRecord(payloadRecord, solrDocument); + // Return null if extractAfterRecord handled a DELETE or skipped the record + if (payloadRecord == null) { + return null; + } + } + + populateSolrFields(payloadRecord, solrDocument); + return solrDocument; + } catch (Exception ex) { + log.error("Debezium record processing failed", ex); + throw new RuntimeException("Failed to extract Debezium payload", ex); + } + } + + /** + * Handles Pulsar's KeyValue schema by checking the native object and + * extracting the 'Value' part of the pair. + */ + private GenericRecord extractValueRecord(GenericRecord rootRecord) { + Object nativePayload = rootRecord.getNativeObject(); + + if (!(nativePayload instanceof KeyValue)) { + return rootRecord; + } + + Object valuePart = ((KeyValue) nativePayload).getValue(); + if (valuePart instanceof GenericRecord) { + log.debug("Detected KeyValue wrapper, extracting value section"); + return (GenericRecord) valuePart; + } + + return rootRecord; + } + + /** + * Identifies if the record is a Debezium CDC envelope. + * Debezium envelopes signify state changes and contain specific metadata fields. + * The "after" field signifies the newest state of the database row after an insert/update. + * We check for the presence of standard CDC fields to differentiate a true envelope + * from a normal database table that simply happens to have a column named "after". + */ + private boolean isDebeziumEnvelope(GenericRecord record) { + boolean hasAfter = false; + boolean hasBefore = false; + boolean hasOp = false; + + for (Field field : record.getFields()) { + String fieldName = field.getName(); + if ("after".equals(fieldName)) hasAfter = true; + else if ("before".equals(fieldName)) hasBefore = true; + else if ("op".equals(fieldName)) hasOp = true; + } + + // A CDC envelope must strictly contain the operation code and a data state + return (hasAfter || hasBefore) && hasOp; + } + + /** + * Separates the INSERT/UPDATE path from the DELETE path. + * Issues immediate deletions to Solr for DELETE events. + */ + private GenericRecord extractAfterRecord(GenericRecord envelopeRecord, SolrInputDocument solrDocument) { + Object afterField = envelopeRecord.getField("after"); + + if (afterField != null) { + return (afterField instanceof GenericRecord) ? (GenericRecord) afterField : envelopeRecord; + } + + // Processing Debezium DELETE path (where 'after' is null) + log.debug("Debezium DELETE event detected, Processing deletion"); + Object beforeField = envelopeRecord.getField("before"); + + if (!(beforeField instanceof GenericRecord)) { + log.warn("DELETE event received, but 'before' field is missing or invalid."); + return null; + } + + GenericRecord beforeRecord = (GenericRecord) beforeField; + List fields = beforeRecord.getFields(); + + if (fields.isEmpty()) { + log.warn("DELETE event received, but 'before' record has no fields to extract ID."); + return null; + } + + // Assumes index 0 is the Primary Key per standard Debezium behavior + Object id = beforeRecord.getField(fields.get(0)); + if (id == null) { + log.warn("DELETE event received, but primary key field value was null."); + return null; + } + + executeSolrDelete(id); + return null; + } + + /** + * Helper to issue delete commands to the SolrClient using configured commit settings. + */ + private void executeSolrDelete(Object id) { + try { + int commitWithinMs = (solrSinkConfig != null && solrSinkConfig.getSolrCommitWithinMs() > 0) + ? solrSinkConfig.getSolrCommitWithinMs() : 1000; + + getSolrClient().deleteById(String.valueOf(id), commitWithinMs); + log.debug("Successfully issued delete to Solr for id={} with commitWithinMs={}", id, commitWithinMs); + } catch (Exception e) { + log.error("Failed to delete document from Solr for id={}", id, e); + } + } + + /** + * Iterates through all fields in a GenericRecord to build the Solr document, + * applying type normalization to each value. + */ + private void populateSolrFields(GenericRecord dataRecord, SolrInputDocument solrDocument) { + for (Field field : dataRecord.getFields()) { + Object rawValue = dataRecord.getField(field); + // Skip nulls and nested records as Solr requires flat field values + if (rawValue == null || rawValue instanceof GenericRecord) { + continue; + } + solrDocument.setField(field.getName(), normalizeValue(rawValue)); + } + } + + /** + * Ensures numeric types are safely coerced to Strings to maintain + * compatibility with Solr's field requirements. + */ + private Object normalizeValue(Object value) { + if (value instanceof Integer || value instanceof Long) { + return String.valueOf(value); + } + return value; } } diff --git a/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java b/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java index daa93a366b..5a889e6221 100644 --- a/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java +++ b/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java @@ -81,6 +81,13 @@ public class SolrSinkConfig implements Serializable { help = "The password to use for basic authentication") private String password; + @FieldDoc( + required = false, + defaultValue = "false", + help = "If true, the sink will attempt to extract the nested 'after' field from CDC/Debezium records." + ) + private boolean unwrapDebeziumRecord = false; + public static SolrSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), SolrSinkConfig.class); diff --git a/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java index 9544f63977..0056cd8e47 100644 --- a/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java +++ b/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java @@ -20,11 +20,13 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericSchema; import org.apache.pulsar.client.impl.MessageImpl; @@ -32,8 +34,11 @@ import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.solr.common.SolrInputDocument; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -46,6 +51,8 @@ public class SolrGenericRecordSinkTest { private SolrServerUtil solrServerUtil; private Message message; + private SolrGenericRecordSink sink; + private Map configs; /** * A Simple class to test solr class. @@ -60,6 +67,14 @@ public static class Foo { public void setUp() throws Exception { solrServerUtil = new SolrServerUtil(8983); solrServerUtil.startStandaloneSolr(); + sink = new SolrGenericRecordSink(); + configs = new HashMap<>(); + configs.put("solrUrl", "http://localhost:8983/solr"); + configs.put("solrMode", "Standalone"); + configs.put("solrCollection", "techproducts"); + configs.put("solrCommitWithinMs", "100"); + configs.put("username", ""); + configs.put("password", ""); } @AfterMethod(alwaysRun = true) @@ -70,13 +85,6 @@ public void tearDown() throws Exception { @Test public void testOpenAndWriteSink() throws Exception { message = mock(MessageImpl.class); - Map configs = new HashMap<>(); - configs.put("solrUrl", "http://localhost:8983/solr"); - configs.put("solrMode", "Standalone"); - configs.put("solrCollection", "techproducts"); - configs.put("solrCommitWithinMs", "100"); - configs.put("username", ""); - configs.put("password", ""); GenericSchema genericAvroSchema; SolrGenericRecordSink sink = new SolrGenericRecordSink(); @@ -109,4 +117,116 @@ public void testOpenAndWriteSink() throws Exception { // open should success sink.open(configs, null); } + + @Test + @SuppressWarnings("unchecked") + public void testDebeziumUnwrapEnvelope() throws Exception { + configs.put("unwrapDebeziumRecord", true); + sink.open(configs, null); + + // Build root record wrapping a KeyValue payload + GenericRecord mockRootRecord = mock(GenericRecord.class); + Record mockMessage = mock(Record.class); + when(mockMessage.getValue()).thenReturn(mockRootRecord); + + GenericRecord mockValueRecord = mock(GenericRecord.class); + when(mockRootRecord.getNativeObject()).thenReturn(new KeyValue<>("key", mockValueRecord)); + // containsAfterField() iterates getFields() - must include "after" field here + Field afterSchemaField = mock(Field.class); + when(afterSchemaField.getName()).thenReturn("after"); + Field opField = mock(Field.class); + when(opField.getName()).thenReturn("op"); + when(mockValueRecord.getFields()).thenReturn(Arrays.asList(afterSchemaField, opField)); + + // extractAfterRecord() calls getField("after") by string - return nested record + GenericRecord mockAfterRecord = mock(GenericRecord.class); + when(mockValueRecord.getField("after")).thenReturn(mockAfterRecord); + + // Fields inside the "after" payload to be mapped into Solr document + Field idField = mock(Field.class); + when(idField.getName()).thenReturn("id"); + Field userIdField = mock(Field.class); + when(userIdField.getName()).thenReturn("user_id"); + when(mockAfterRecord.getFields()).thenReturn(Arrays.asList(idField, userIdField)); + when(mockAfterRecord.getField(idField)).thenReturn(100); + when(mockAfterRecord.getField(userIdField)).thenReturn(999); + + SolrInputDocument doc = sink.convert(mockMessage); + + Assert.assertNotNull(doc, "Document should not be null for envelope with 'after' field"); + Assert.assertEquals(doc.getFieldValue("id"), "100"); + Assert.assertEquals(doc.getFieldValue("user_id"), "999"); + } + + @Test + @SuppressWarnings("unchecked") + public void testDebeziumUnwrapFlatValue() throws Exception { + configs.put("unwrapDebeziumRecord", true); + sink.open(configs, null); + + // Build root record wrapping a KeyValue payload + GenericRecord mockRootRecord = mock(GenericRecord.class); + Record mockMessage = mock(Record.class); + when(mockMessage.getValue()).thenReturn(mockRootRecord); + + GenericRecord mockValueRecord = mock(GenericRecord.class); + when(mockRootRecord.getNativeObject()).thenReturn(new KeyValue<>("key", mockValueRecord)); + + // containsAfterField() will iterate these, no "after" field, so flat path is taken + Field idField = mock(Field.class); + when(idField.getName()).thenReturn("id"); + Field profileIdField = mock(Field.class); + when(profileIdField.getName()).thenReturn("profile_id"); + when(mockValueRecord.getFields()).thenReturn(Arrays.asList(idField, profileIdField)); + + // populateSolrFields() reads values by Field object reference + when(mockValueRecord.getField(idField)).thenReturn(200); + when(mockValueRecord.getField(profileIdField)).thenReturn(801); + SolrInputDocument doc = sink.convert(mockMessage); + + Assert.assertNotNull(doc, "Document should not be null for flat value record"); + Assert.assertEquals(doc.getFieldValue("id"), "200"); + Assert.assertEquals(doc.getFieldValue("profile_id"), "801"); + Assert.assertNull(doc.getFieldValue("user_id"), "user_id was not provided, should be null"); + } + + @Test + @SuppressWarnings("unchecked") + public void testDebeziumUnwrapDelete() throws Exception { + configs.put("unwrapDebeziumRecord", true); + sink.open(configs, null); + + // Build root record + GenericRecord mockRootRecord = mock(GenericRecord.class); + Record mockMessage = mock(Record.class); + when(mockMessage.getValue()).thenReturn(mockRootRecord); + + GenericRecord mockValueRecord = mock(GenericRecord.class); + when(mockRootRecord.getNativeObject()).thenReturn(new KeyValue<>("key", mockValueRecord)); + + // Mock Debezium DELETE signature: 'after' is null, 'before' and 'op' are present + Field beforeSchemaField = mock(Field.class); + when(beforeSchemaField.getName()).thenReturn("before"); + Field opField = mock(Field.class); + when(opField.getName()).thenReturn("op"); + when(mockValueRecord.getFields()).thenReturn(Arrays.asList(beforeSchemaField, opField)); + + // 'after' is null for deletes + when(mockValueRecord.getField("after")).thenReturn(null); + + // Mock 'before' record containing the ID to be deleted + GenericRecord mockBeforeRecord = mock(GenericRecord.class); + when(mockValueRecord.getField("before")).thenReturn(mockBeforeRecord); + + Field idField = mock(Field.class); + when(idField.getName()).thenReturn("id"); + when(mockBeforeRecord.getFields()).thenReturn(Arrays.asList(idField)); + when(mockBeforeRecord.getField(idField)).thenReturn(500); + + // This should trigger executeSolrDelete and return null + SolrInputDocument doc = sink.convert(mockMessage); + + // In our refactored logic, a DELETE returns null so the parent class can ACK it + Assert.assertNull(doc, "Document should be null for DELETE events to trigger ACK"); + } }