Skip to content

[fix][io] Interrupt Kafka Source instance thread on consumer fatal error to prevent deadlock#27

Open
Praveenkumar76 wants to merge 5 commits into
apache:masterfrom
cognitree:fix/kafka-source-liveness-deadlock
Open

[fix][io] Interrupt Kafka Source instance thread on consumer fatal error to prevent deadlock#27
Praveenkumar76 wants to merge 5 commits into
apache:masterfrom
cognitree:fix/kafka-source-liveness-deadlock

Conversation

@Praveenkumar76

@Praveenkumar76 Praveenkumar76 commented May 23, 2026

Copy link
Copy Markdown

Fixes apache#25290

Motivation

The Kafka Source connector can enter a liveness failure when its consumer thread encounters a fatal error. In such cases, the connector reports the error via notifyError(), which relies on the framework’s instance thread to read and handle it.

However, if the instance thread is blocked in sendOutputMessage() (for example, waiting on network I/O to the Pulsar broker), it does not return to readNext() and therefore never processes the error queue. As a result, the consumer thread terminates, the instance thread remains stuck, and the connector pod continues running without doing useful work. This leads to a “zombie” state where Kubernetes health checks still pass, but the connector is effectively dead.

Modifications

  • Captured the framework’s main instanceThread reference during the open() method.
  • Updated the Kafka consumer runnerThread error handling:
    • In both the catch block and UncaughtExceptionHandler, when a fatal exception occurs, the consumer thread now interrupts the instanceThread.
  • This interruption breaks the blocking call (e.g., CompletableFuture.get() or socket wait) in sendOutputMessage(), allowing the instance thread to wake up and process the error via notifyError().
  • Ensured graceful shutdown behavior so that the failure propagates correctly and Kubernetes can restart the pod.
  • Moved consumer.subscribe() outside the while (running) loop to properly surface initialization errors.

Verifying this change

  • Verified that the connector no longer remains stuck when the consumer thread fails.
  • Confirmed that the instance thread is interrupted, wakes up, processes the error, and exits cleanly.

This change added tests and can be verified as follows:

  • Added an integration test: KafkaSourceDeadlockTest.java
  • Test design:
    • Uses Testcontainers to run a Kafka broker.
    • Simulates a fatal failure in the consumer thread.
    • Simulates a blocked sendOutputMessage() in the instance thread.
  • Assertion:
    • The consumer thread interrupts the instance thread.
    • The deadlock is broken and the connector exits instead of hanging.

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] KafkaAbstractSource consumer thread fatal error is silently lost when instance thread is blocked in sendOutputMessage()

1 participant