Skip to content

Add forward_to support along with failure_pipeline support for kafka sink#6778

Open
graytaylor0 wants to merge 1 commit intoopensearch-project:mainfrom
graytaylor0:ForwardtoMsk
Open

Add forward_to support along with failure_pipeline support for kafka sink#6778
graytaylor0 wants to merge 1 commit intoopensearch-project:mainfrom
graytaylor0:ForwardtoMsk

Conversation

@graytaylor0
Copy link
Copy Markdown
Member

@graytaylor0 graytaylor0 commented Apr 24, 2026

Description

This change enables support for forward_to option for the kafka sink, as well as configuring a failure_pipeline.

Tested by running data prepper and verifying events are forwarded to correct sub-pipeline on success, and routed to failure-pipeline on failure to write to msk.

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…sink

Signed-off-by: Taylor Gray <tylgry@amazon.com>
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @graytaylor0 !

}
} else {
releaseEventHandles(true);
if (sinkContext != null && sinkForwardRecordsContext != null && sinkContext.getForwardToPipelines().size() > 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we need this?

&& sinkContext.getForwardToPipelines().size() > 0

Why would we get a SinkForwardRecordsContext if there is no pipeline to forward to?

callbackArgumentCaptor.getValue().onCompletion(null, new RuntimeException("kafka error"));

verify(numberOfRecordProcessingError).increment();
verify(failurePipeline).sendEvents(any());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify the error message.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the pipeline name

verify(kafkaProducer).send(any(ProducerRecord.class), callbackArgumentCaptor.capture());
callbackArgumentCaptor.getValue().onCompletion(mock(org.apache.kafka.clients.producer.RecordMetadata.class), null);

verify(sinkContext, org.mockito.Mockito.never()).forwardRecords(any(), any(), any());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a fully-qualified import?


verify(failurePipeline).sendEvents(any());
verify(failureMetadata).withPluginName("kafka");
verify(failureMetadata).with("topic", "test-topic");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify the error message.

} else {
releaseEventHandles(true);
if (sinkContext != null && sinkForwardRecordsContext != null && sinkContext.getForwardToPipelines().size() > 0) {
sinkForwardRecordsContext.addRecord(originalRecord);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that the OpenSearch sink forwards the OpenSearch document. Do we want to be consistent with that? This would be a little odd in this sink since it might be JSON or binary data. But it would retain some consistency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants