Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 29 additions & 24 deletions src/main/kotlin/kolbasa/consumer/ConsumerSchemaHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -137,34 +137,39 @@ internal object ConsumerSchemaHelpers {
val scheduledAt = resultSet.getTimestamp(columnIndex++).time
val attempts = resultSet.getInt(columnIndex++)

val data = when (queue.databaseDataType) {
is DatabaseQueueDataType.Json -> {
val data = resultSet.getString(columnIndex++)
approxBytesCounter.addString(data)
queue.databaseDataType.deserializer(data)
}
val id = Id(localId, shard)
val data = try {
when (queue.databaseDataType) {
is DatabaseQueueDataType.Json -> {
val data = resultSet.getString(columnIndex++)
approxBytesCounter.addString(data)
queue.databaseDataType.deserializer(data)
}

is DatabaseQueueDataType.Binary -> {
val data = resultSet.getBytes(columnIndex++)
approxBytesCounter.addByteArray(data)
queue.databaseDataType.deserializer(data)
}
is DatabaseQueueDataType.Binary -> {
val data = resultSet.getBytes(columnIndex++)
approxBytesCounter.addByteArray(data)
queue.databaseDataType.deserializer(data)
}

is DatabaseQueueDataType.Text -> {
val data = resultSet.getString(columnIndex++)
approxBytesCounter.addString(data)
queue.databaseDataType.deserializer(data)
}
is DatabaseQueueDataType.Text -> {
val data = resultSet.getString(columnIndex++)
approxBytesCounter.addString(data)
queue.databaseDataType.deserializer(data)
}

is DatabaseQueueDataType.Int -> {
approxBytesCounter.addInt()
queue.databaseDataType.deserializer(resultSet.getInt(columnIndex++))
}
is DatabaseQueueDataType.Int -> {
approxBytesCounter.addInt()
queue.databaseDataType.deserializer(resultSet.getInt(columnIndex++))
}

is DatabaseQueueDataType.Long -> {
approxBytesCounter.addLong()
queue.databaseDataType.deserializer(resultSet.getLong(columnIndex++))
is DatabaseQueueDataType.Long -> {
approxBytesCounter.addLong()
queue.databaseDataType.deserializer(resultSet.getLong(columnIndex++))
}
}
} catch (e: Exception) {
throw MessageDeserializationException(id, e)
}

val meta = if (receiveOptions.readMetadata && queue.metadata.fields.isNotEmpty()) {
Expand Down Expand Up @@ -192,7 +197,7 @@ internal object ConsumerSchemaHelpers {
}

val message = Message(
id = Id(localId, shard),
id = id,
createdAt = createdAt,
processingAt = processingAt,
scheduledAt = scheduledAt,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kolbasa.consumer

import kolbasa.producer.Id

/**
* Thrown by [ConsumerSchemaHelpers.read] when a queue row's payload cannot be deserialized into a [Message].
*
* The receive CTE has already updated `remaining_attempts` and `scheduled_at` for the row by the time
* deserialization runs, so [kolbasa.consumer.connection.ConnectionAwareDatabaseConsumer] catches this
* exception and skips the row instead of letting it propagate. If it were rethrown, the surrounding
* transaction would roll back the visibility-timeout bookkeeping and the poison message would be
* re-selected on every tick, blocking the queue.
*/
class MessageDeserializationException(
val id: Id,
cause: Throwable
) : RuntimeException("Failed to deserialize message $id: ${cause.message}", cause)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kolbasa.cluster.Shards
import kolbasa.consumer.ConsumerOptions
import kolbasa.consumer.ConsumerSchemaHelpers
import kolbasa.consumer.Message
import kolbasa.consumer.MessageDeserializationException
import kolbasa.consumer.ReceiveOptions
import kolbasa.consumer.sweep.SweepHelper
import kolbasa.producer.Id
Expand Down Expand Up @@ -69,7 +70,14 @@ class ConnectionAwareDatabaseConsumer internal constructor(
val result = ArrayList<Message<Data>>(limit)

while (resultSet.next()) {
result += ConsumerSchemaHelpers.read(queue, receiveOptions, resultSet, approxBytesCounter)
try {
result += ConsumerSchemaHelpers.read(queue, receiveOptions, resultSet, approxBytesCounter)
} catch (_: MessageDeserializationException) {
// Skip the row. The receive CTE has already decremented remaining_attempts and pushed
// scheduled_at, so the row will be retried (and eventually expire / move to DLQ).
// Rethrowing would roll back the surrounding transaction together with that bookkeeping,
// pinning the poison message at the head of the queue forever.
}
}

result
Expand Down