Skip to content

Expose TopicPartition and offset from ProcessingContext #216

@bitterfox

Description

@bitterfox

We develop Kafka consume using decaton such as

  • Consume messages from Kafka
  • Write messages into file and make a large file containing 10K and more messages
    • We would like create 100MB+ files, if single message size is 300B and 50% compression ratio, the number of messages contained will be 700K
    • Thus we specify huge decaton.max.pending.records like 100K, 1M
  • We'd like to commit offset for all messages are persisted in the file
    • We use external storage and we assume data is persisted when the file descriptor is closed successfully
    • So we don't like to commit offset until we close it

We used deferred completion and tried to keep all deferred completions, however, we figured out we need huge heap, otherwise OOME.
Finally, we solved this by keeping the deferred completion for the smallest offset for each topic partition per file.

This worked well, however, we need ugly and fragile code to take topic partition and offset for current message from processing context like

        if (context is ProcessingContextImpl) {
            try {
                val clazz = ProcessingContextImpl::class.java
                val requestField = clazz.getDeclaredField("request")
                requestField.isAccessible = true
                val request = requestField.get(context) as TaskRequest
                val topicPartition = request.topicPartition()
                val offset = request.recordOffset()
                return Pair(topicPartition, offset)
            } catch (e: ReflectiveOperationException) {
                logger.debug("Fallback to take topicPartitionAndOffset from logging context", e)
            }
        }

        context.loggingContext().use {
            val topic = MDC.get(LoggingContext.TOPIC_KEY)
            if (topic == null) {
                logger.warn(
                    "Cannot find topicPartitionAndOffset from MDC, " +
                        "configure decaton.logging.mdc.enabled=true",
                )
                return Pair(TopicPartition("dummy", 0), 0)
            }
            val partition = MDC.get(LoggingContext.PARTITION_KEY).toInt()
            val offset = MDC.get(LoggingContext.OFFSET_KEY).toLong()

            return Pair(TopicPartition(topic, partition), offset)
        }

I'm wondering if ProcessingContext can provide such info officially, and it would be useful for some usecase for advanced users like us.

If you don't like to expose interface of Kafka, I'd like to have an object having methods to

  • check the task A is coming from the same group (topic partition for kafka) of another task B (A.topic-partition == B.topic-parition?)
  • check the task A is prior than another task B in the same group (A.offset < B.offset?)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions