-
Notifications
You must be signed in to change notification settings - Fork 60
Open
Description
We develop Kafka consume using decaton such as
- Consume messages from Kafka
- Write messages into file and make a large file containing 10K and more messages
- We would like create 100MB+ files, if single message size is 300B and 50% compression ratio, the number of messages contained will be 700K
- Thus we specify
huge decaton.max.pending.recordslike 100K, 1M
- We'd like to commit offset for all messages are persisted in the file
- We use external storage and we assume data is persisted when the file descriptor is closed successfully
- So we don't like to commit offset until we close it
We used deferred completion and tried to keep all deferred completions, however, we figured out we need huge heap, otherwise OOME.
Finally, we solved this by keeping the deferred completion for the smallest offset for each topic partition per file.
This worked well, however, we need ugly and fragile code to take topic partition and offset for current message from processing context like
if (context is ProcessingContextImpl) {
try {
val clazz = ProcessingContextImpl::class.java
val requestField = clazz.getDeclaredField("request")
requestField.isAccessible = true
val request = requestField.get(context) as TaskRequest
val topicPartition = request.topicPartition()
val offset = request.recordOffset()
return Pair(topicPartition, offset)
} catch (e: ReflectiveOperationException) {
logger.debug("Fallback to take topicPartitionAndOffset from logging context", e)
}
}
context.loggingContext().use {
val topic = MDC.get(LoggingContext.TOPIC_KEY)
if (topic == null) {
logger.warn(
"Cannot find topicPartitionAndOffset from MDC, " +
"configure decaton.logging.mdc.enabled=true",
)
return Pair(TopicPartition("dummy", 0), 0)
}
val partition = MDC.get(LoggingContext.PARTITION_KEY).toInt()
val offset = MDC.get(LoggingContext.OFFSET_KEY).toLong()
return Pair(TopicPartition(topic, partition), offset)
}
I'm wondering if ProcessingContext can provide such info officially, and it would be useful for some usecase for advanced users like us.
If you don't like to expose interface of Kafka, I'd like to have an object having methods to
- check the task A is coming from the same group (topic partition for kafka) of another task B (A.topic-partition == B.topic-parition?)
- check the task A is prior than another task B in the same group (A.offset < B.offset?)
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels