diff --git a/src/main/kotlin/kolbasa/consumer/ConsumerSchemaHelpers.kt b/src/main/kotlin/kolbasa/consumer/ConsumerSchemaHelpers.kt index c1fd785..50a44fd 100644 --- a/src/main/kotlin/kolbasa/consumer/ConsumerSchemaHelpers.kt +++ b/src/main/kotlin/kolbasa/consumer/ConsumerSchemaHelpers.kt @@ -137,34 +137,39 @@ internal object ConsumerSchemaHelpers { val scheduledAt = resultSet.getTimestamp(columnIndex++).time val attempts = resultSet.getInt(columnIndex++) - val data = when (queue.databaseDataType) { - is DatabaseQueueDataType.Json -> { - val data = resultSet.getString(columnIndex++) - approxBytesCounter.addString(data) - queue.databaseDataType.deserializer(data) - } + val id = Id(localId, shard) + val data = try { + when (queue.databaseDataType) { + is DatabaseQueueDataType.Json -> { + val data = resultSet.getString(columnIndex++) + approxBytesCounter.addString(data) + queue.databaseDataType.deserializer(data) + } - is DatabaseQueueDataType.Binary -> { - val data = resultSet.getBytes(columnIndex++) - approxBytesCounter.addByteArray(data) - queue.databaseDataType.deserializer(data) - } + is DatabaseQueueDataType.Binary -> { + val data = resultSet.getBytes(columnIndex++) + approxBytesCounter.addByteArray(data) + queue.databaseDataType.deserializer(data) + } - is DatabaseQueueDataType.Text -> { - val data = resultSet.getString(columnIndex++) - approxBytesCounter.addString(data) - queue.databaseDataType.deserializer(data) - } + is DatabaseQueueDataType.Text -> { + val data = resultSet.getString(columnIndex++) + approxBytesCounter.addString(data) + queue.databaseDataType.deserializer(data) + } - is DatabaseQueueDataType.Int -> { - approxBytesCounter.addInt() - queue.databaseDataType.deserializer(resultSet.getInt(columnIndex++)) - } + is DatabaseQueueDataType.Int -> { + approxBytesCounter.addInt() + queue.databaseDataType.deserializer(resultSet.getInt(columnIndex++)) + } - is DatabaseQueueDataType.Long -> { - approxBytesCounter.addLong() - queue.databaseDataType.deserializer(resultSet.getLong(columnIndex++)) + is DatabaseQueueDataType.Long -> { + approxBytesCounter.addLong() + queue.databaseDataType.deserializer(resultSet.getLong(columnIndex++)) + } } + } catch (e: Exception) { + throw MessageDeserializationException(id, e) } val meta = if (receiveOptions.readMetadata && queue.metadata.fields.isNotEmpty()) { @@ -192,7 +197,7 @@ internal object ConsumerSchemaHelpers { } val message = Message( - id = Id(localId, shard), + id = id, createdAt = createdAt, processingAt = processingAt, scheduledAt = scheduledAt, diff --git a/src/main/kotlin/kolbasa/consumer/MessageDeserializationException.kt b/src/main/kotlin/kolbasa/consumer/MessageDeserializationException.kt new file mode 100644 index 0000000..1cd58c7 --- /dev/null +++ b/src/main/kotlin/kolbasa/consumer/MessageDeserializationException.kt @@ -0,0 +1,17 @@ +package kolbasa.consumer + +import kolbasa.producer.Id + +/** + * Thrown by [ConsumerSchemaHelpers.read] when a queue row's payload cannot be deserialized into a [Message]. + * + * The receive CTE has already updated `remaining_attempts` and `scheduled_at` for the row by the time + * deserialization runs, so [kolbasa.consumer.connection.ConnectionAwareDatabaseConsumer] catches this + * exception and skips the row instead of letting it propagate. If it were rethrown, the surrounding + * transaction would roll back the visibility-timeout bookkeeping and the poison message would be + * re-selected on every tick, blocking the queue. + */ +class MessageDeserializationException( + val id: Id, + cause: Throwable +) : RuntimeException("Failed to deserialize message $id: ${cause.message}", cause) diff --git a/src/main/kotlin/kolbasa/consumer/connection/ConnectionAwareDatabaseConsumer.kt b/src/main/kotlin/kolbasa/consumer/connection/ConnectionAwareDatabaseConsumer.kt index 7b620cd..fbf469e 100644 --- a/src/main/kotlin/kolbasa/consumer/connection/ConnectionAwareDatabaseConsumer.kt +++ b/src/main/kotlin/kolbasa/consumer/connection/ConnectionAwareDatabaseConsumer.kt @@ -4,6 +4,7 @@ import kolbasa.cluster.Shards import kolbasa.consumer.ConsumerOptions import kolbasa.consumer.ConsumerSchemaHelpers import kolbasa.consumer.Message +import kolbasa.consumer.MessageDeserializationException import kolbasa.consumer.ReceiveOptions import kolbasa.consumer.sweep.SweepHelper import kolbasa.producer.Id @@ -69,7 +70,14 @@ class ConnectionAwareDatabaseConsumer internal constructor( val result = ArrayList>(limit) while (resultSet.next()) { - result += ConsumerSchemaHelpers.read(queue, receiveOptions, resultSet, approxBytesCounter) + try { + result += ConsumerSchemaHelpers.read(queue, receiveOptions, resultSet, approxBytesCounter) + } catch (_: MessageDeserializationException) { + // Skip the row. The receive CTE has already decremented remaining_attempts and pushed + // scheduled_at, so the row will be retried (and eventually expire / move to DLQ). + // Rethrowing would roll back the surrounding transaction together with that bookkeeping, + // pinning the poison message at the head of the queue forever. + } } result