Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
@Slf4j
public abstract class SolrAbstractSink<T> implements Sink<T> {

private SolrSinkConfig solrSinkConfig;
protected SolrSinkConfig solrSinkConfig;
private SolrClient client;
private boolean enableBasicAuth;

Expand Down Expand Up @@ -77,8 +77,19 @@ public void write(Record<T> record) {
);
}

SolrInputDocument document = convert(record);
updateRequest.add(document);
try {
SolrInputDocument document = convert(record);
if (document == null) {
// It was a DELETE event or a skip, Acknowledge it so it isn't retried
record.ack();
return;
}
updateRequest.add(document);
} catch (Exception e) {
log.error("Failed to convert record: {}", record, e);
record.fail();
return;
}

try {
UpdateResponse updateResponse = updateRequest.process(client, solrSinkConfig.getSolrCollection());
Expand All @@ -103,6 +114,10 @@ public void close() throws Exception {
// convert record as a Solr document
public abstract SolrInputDocument convert(Record<T> message);

protected SolrClient getSolrClient() {
return client;
}

public static SolrClient getClient(SolrMode solrMode, String url) {
SolrClient solrClient = null;
if (solrMode.equals(SolrMode.STANDALONE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
Expand All @@ -38,15 +39,169 @@
)
@Slf4j
public class SolrGenericRecordSink extends SolrAbstractSink<GenericRecord> {

/**
* Entry point for conversion. Determines if the record requires CDC unwrapping
* based on user configuration.
*/
@Override
public SolrInputDocument convert(Record<GenericRecord> message) {
SolrInputDocument doc = new SolrInputDocument();
GenericRecord record = message.getValue();
List<Field> fields = record.getFields();
for (Field field : fields) {
Object fieldValue = record.getField(field);
doc.setField(field.getName(), fieldValue);
}
return doc;
public SolrInputDocument convert(Record<GenericRecord> pulsarRecord) {
SolrInputDocument solrDocument = new SolrInputDocument();
GenericRecord messageValue = pulsarRecord.getValue();

if (solrSinkConfig != null && solrSinkConfig.isUnwrapDebeziumRecord()) {
return mapDebeziumPayload(messageValue, solrDocument);
}

// Default mapping for non-CDC messages now uses the same safe population logic
populateSolrFields(messageValue, solrDocument);
return solrDocument;
}

/**
* Orchestrates the Debezium extraction process and ensures processing errors
* are thrown to trigger Pulsar's failure handling.
*/
private SolrInputDocument mapDebeziumPayload(GenericRecord rootRecord, SolrInputDocument solrDocument) {
try {
GenericRecord payloadRecord = extractValueRecord(rootRecord);

if (isDebeziumEnvelope(payloadRecord)) {
payloadRecord = extractAfterRecord(payloadRecord, solrDocument);
// Return null if extractAfterRecord handled a DELETE or skipped the record
if (payloadRecord == null) {
return null;
}
}

populateSolrFields(payloadRecord, solrDocument);
return solrDocument;
} catch (Exception ex) {
log.error("Debezium record processing failed", ex);
throw new RuntimeException("Failed to extract Debezium payload", ex);
}
}

/**
* Handles Pulsar's KeyValue schema by checking the native object and
* extracting the 'Value' part of the pair.
*/
private GenericRecord extractValueRecord(GenericRecord rootRecord) {
Object nativePayload = rootRecord.getNativeObject();

if (!(nativePayload instanceof KeyValue)) {
return rootRecord;
}

Object valuePart = ((KeyValue<?, ?>) nativePayload).getValue();
if (valuePart instanceof GenericRecord) {
log.debug("Detected KeyValue wrapper, extracting value section");
return (GenericRecord) valuePart;
}

return rootRecord;
}

/**
* Identifies if the record is a Debezium CDC envelope.
* Debezium envelopes signify state changes and contain specific metadata fields.
* The "after" field signifies the newest state of the database row after an insert/update.
* We check for the presence of standard CDC fields to differentiate a true envelope
* from a normal database table that simply happens to have a column named "after".
*/
private boolean isDebeziumEnvelope(GenericRecord record) {
boolean hasAfter = false;
boolean hasBefore = false;
boolean hasOp = false;

for (Field field : record.getFields()) {
String fieldName = field.getName();
if ("after".equals(fieldName)) hasAfter = true;
else if ("before".equals(fieldName)) hasBefore = true;
else if ("op".equals(fieldName)) hasOp = true;
}

// A CDC envelope must strictly contain the operation code and a data state
return (hasAfter || hasBefore) && hasOp;
}

/**
* Separates the INSERT/UPDATE path from the DELETE path.
* Issues immediate deletions to Solr for DELETE events.
*/
private GenericRecord extractAfterRecord(GenericRecord envelopeRecord, SolrInputDocument solrDocument) {
Object afterField = envelopeRecord.getField("after");

if (afterField != null) {
return (afterField instanceof GenericRecord) ? (GenericRecord) afterField : envelopeRecord;
}

// Processing Debezium DELETE path (where 'after' is null)
log.debug("Debezium DELETE event detected, Processing deletion");
Object beforeField = envelopeRecord.getField("before");

if (!(beforeField instanceof GenericRecord)) {
log.warn("DELETE event received, but 'before' field is missing or invalid.");
return null;
}

GenericRecord beforeRecord = (GenericRecord) beforeField;
List<Field> fields = beforeRecord.getFields();

if (fields.isEmpty()) {
log.warn("DELETE event received, but 'before' record has no fields to extract ID.");
return null;
}

// Assumes index 0 is the Primary Key per standard Debezium behavior
Object id = beforeRecord.getField(fields.get(0));
if (id == null) {
log.warn("DELETE event received, but primary key field value was null.");
return null;
}

executeSolrDelete(id);
return null;
}

/**
* Helper to issue delete commands to the SolrClient using configured commit settings.
*/
private void executeSolrDelete(Object id) {
try {
int commitWithinMs = (solrSinkConfig != null && solrSinkConfig.getSolrCommitWithinMs() > 0)
? solrSinkConfig.getSolrCommitWithinMs() : 1000;

getSolrClient().deleteById(String.valueOf(id), commitWithinMs);
log.debug("Successfully issued delete to Solr for id={} with commitWithinMs={}", id, commitWithinMs);
} catch (Exception e) {
log.error("Failed to delete document from Solr for id={}", id, e);
}
}

/**
* Iterates through all fields in a GenericRecord to build the Solr document,
* applying type normalization to each value.
*/
private void populateSolrFields(GenericRecord dataRecord, SolrInputDocument solrDocument) {
for (Field field : dataRecord.getFields()) {
Object rawValue = dataRecord.getField(field);
// Skip nulls and nested records as Solr requires flat field values
if (rawValue == null || rawValue instanceof GenericRecord) {
continue;
}
solrDocument.setField(field.getName(), normalizeValue(rawValue));
}
}

/**
* Ensures numeric types are safely coerced to Strings to maintain
* compatibility with Solr's field requirements.
*/
private Object normalizeValue(Object value) {
if (value instanceof Integer || value instanceof Long) {
return String.valueOf(value);
}
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public class SolrSinkConfig implements Serializable {
help = "The password to use for basic authentication")
private String password;

@FieldDoc(
required = false,
defaultValue = "false",
help = "If true, the sink will attempt to extract the nested 'after' field from CDC/Debezium records."
)
private boolean unwrapDebeziumRecord = false;

public static SolrSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), SolrSinkConfig.class);
Expand Down
Loading