Skip to content

Fix Silent Row Loss in Apache NiFi Validation Pipeline #2

@agmangas

Description

@agmangas

Context

During a Data Quality run, Apache NiFi emits one Kafka message per validated row. The platform API determines completion by counting those messages.

For an example CSV dataset with 4,013 rows:

  • Expected: 4,013 validation messages
  • Received by Quality Reporter: 1,757 messages
  • NiFi showed no active processors and no queued FlowFiles

From NiFi’s perspective, the flow completed successfully. However, the platform never reached 100% (or the 90% fallback threshold) because not all rows were emitted.

Root Cause

The discrepancy is caused by the failure relationship in the DQAValidator processor.

When a row raises an exception (malformed value, encoding issue, unexpected null, etc.), the processor catches it and routes the FlowFile to failure:

# DQAValidator.py, transform()
except Exception as e:
    self.logger.error(str(e))
    self.logger.error(traceback.format_exc())
    return FlowFileTransformResult(relationship="failure")

The exception handling in DQAValidator.py is correct.

The issue arises when the failure relationship in the deployed flow is auto-terminated (the default in many NiFi setups). In that case:

  • The FlowFile is silently discarded
  • No Kafka message is published
  • The Quality Reporter never sees the row
  • The platform’s completion logic never accounts for it

As a result, rows are lost without visibility.

Relevant Files

  • nifi-flows/moderate-dynamic-rules-kafka-ingestion-flow.json — deployed flow definition
  • ansible-configurator/NiFi_Processors/vendored/dqa-validator/DQAValidator.py — processor implementation

Proposed Fix: Route failure to a Dead-Letter Kafka Topic (Recommended)

Instead of auto-terminating failure, route it to a Kafka topic so failed rows are observable and auditable.

Steps (NiFi UI)

  1. Open the DQAValidator processor.

  2. Go to the Relationships tab.

  3. If failure is set to auto-terminate, uncheck it.

  4. Add a PublishKafka (or PublishKafka_2_6) processor.

  5. Configure it to publish to a topic such as:

    validation-failures
    

    (Create the topic in Kafka if necessary.)

  6. Create a connection:

    DQAValidator (failure) → PublishKafka
    

The message content will be the original FlowFile (JSON-encoded row), which is sufficient for inspection and replay if needed.

Deployment Update

Update the flow definition:

nifi-flows/moderate-dynamic-rules-kafka-ingestion-flow.json

so the new connection is included in Ansible-managed deployments.

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions