From 909c3804fa2e3fe24e78802b9064493537703fc5 Mon Sep 17 00:00:00 2001 From: Vasily Vasilkov Date: Sat, 6 Jun 2026 10:39:29 +0400 Subject: [PATCH] Eliminate ugly conversions with the new Instant meta field --- .../kolbasa/consumer/ConsumerSchemaHelpers.kt | 20 ++++++------ .../kotlin/kolbasa/queue/meta/Metadata.kt | 31 ++++++++++--------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/main/kotlin/kolbasa/consumer/ConsumerSchemaHelpers.kt b/src/main/kotlin/kolbasa/consumer/ConsumerSchemaHelpers.kt index c1fd785..26cfe05 100644 --- a/src/main/kotlin/kolbasa/consumer/ConsumerSchemaHelpers.kt +++ b/src/main/kotlin/kolbasa/consumer/ConsumerSchemaHelpers.kt @@ -280,10 +280,10 @@ internal object ConsumerSchemaHelpers { add(Const.DATA_COLUMN_NAME) mainQueue.metadata.fields.forEach { add(it.dbColumnName) } // Original values - add(Const.ID_COLUMN_NAME) // original id - add("(extract(epoch from ${Const.CREATED_AT_COLUMN_NAME}) * 1000)::bigint") // original created_at - add("(extract(epoch from ${Const.PROCESSING_AT_COLUMN_NAME}) * 1000)::bigint") // original processing_at - add("(extract(epoch from ${Const.SCHEDULED_AT_COLUMN_NAME}) * 1000)::bigint") // original scheduled_at + add(Const.ID_COLUMN_NAME) // original id + add(Const.CREATED_AT_COLUMN_NAME) // original created_at + add(Const.PROCESSING_AT_COLUMN_NAME) // original processing_at + add(Const.SCHEDULED_AT_COLUMN_NAME) // original scheduled_at } val selectStr = selectExprs.joinToString(",") @@ -349,18 +349,18 @@ internal object ConsumerSchemaHelpers { // SELECT expressions val selectExprs = buildList { - add("statement_timestamp()") // scheduled_at = now - add("${archiveQueue.options.defaultAttempts}") // remaining_attempts (high value) + add("statement_timestamp()") // scheduled_at = now + add("${archiveQueue.options.defaultAttempts}") // remaining_attempts (high value) add(Const.SHARD_COLUMN_NAME) add(Const.PRODUCER_COLUMN_NAME) add(Const.CONSUMER_COLUMN_NAME) add(Const.DATA_COLUMN_NAME) mainQueue.metadata.fields.forEach { add(it.dbColumnName) } // Original values - add(Const.ID_COLUMN_NAME) // original id - add("(extract(epoch from ${Const.CREATED_AT_COLUMN_NAME}) * 1000)::bigint") // original created_at - add(Const.REMAINING_ATTEMPTS_COLUMN_NAME) // original remaining_attempts - add("(extract(epoch from ${Const.PROCESSING_AT_COLUMN_NAME}) * 1000)::bigint") // original processing_at + add(Const.ID_COLUMN_NAME) // original id + add(Const.CREATED_AT_COLUMN_NAME) // original created_at + add(Const.REMAINING_ATTEMPTS_COLUMN_NAME) // original remaining_attempts + add(Const.PROCESSING_AT_COLUMN_NAME) // original processing_at } val selectStr = selectExprs.joinToString(",") diff --git a/src/main/kotlin/kolbasa/queue/meta/Metadata.kt b/src/main/kotlin/kolbasa/queue/meta/Metadata.kt index 5196da6..894ab24 100644 --- a/src/main/kotlin/kolbasa/queue/meta/Metadata.kt +++ b/src/main/kotlin/kolbasa/queue/meta/Metadata.kt @@ -1,6 +1,7 @@ package kolbasa.queue.meta import kolbasa.schema.Const +import java.time.Instant /** * Metadata for a queue. @@ -32,17 +33,17 @@ data class Metadata(val fields: List>) { val DLQ_ORIGINAL_ID: MetaField = LongField("original_${Const.ID_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE) - /** Original created_at timestamp (epoch millis) from the source queue */ - val DLQ_ORIGINAL_CREATED_AT: MetaField = - LongField("original_${Const.CREATED_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE) + /** Original created_at timestamptz from the source queue */ + val DLQ_ORIGINAL_CREATED_AT: MetaField = + InstantField("original_${Const.CREATED_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE) - /** Original processing_at timestamp (epoch millis) from the source queue */ - val DLQ_ORIGINAL_PROCESSING_AT: MetaField = - LongField("original_${Const.PROCESSING_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE) + /** Original processing_at timestamptz from the source queue */ + val DLQ_ORIGINAL_PROCESSING_AT: MetaField = + InstantField("original_${Const.PROCESSING_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE) - /** Original scheduled_at timestamp (epoch millis) from the source queue */ - val DLQ_ORIGINAL_SCHEDULED_AT: MetaField = - LongField("original_${Const.SCHEDULED_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE) + /** Original scheduled_at timestamptz from the source queue */ + val DLQ_ORIGINAL_SCHEDULED_AT: MetaField = + InstantField("original_${Const.SCHEDULED_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE) val DLQ_FIELDS = listOf( DLQ_ORIGINAL_ID, @@ -59,17 +60,17 @@ data class Metadata(val fields: List>) { val ARCHIVE_ORIGINAL_ID: MetaField = LongField("original_${Const.ID_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE) - /** Original created_at timestamp (epoch millis) from the source queue */ - val ARCHIVE_ORIGINAL_CREATED_AT: MetaField = - LongField("original_${Const.CREATED_AT_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE) + /** Original created_at timestamptz from the source queue */ + val ARCHIVE_ORIGINAL_CREATED_AT: MetaField = + InstantField("original_${Const.CREATED_AT_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE) /** Original remaining_attempts from the source queue */ val ARCHIVE_ORIGINAL_REMAINING_ATTEMPTS: MetaField = IntField("original_${Const.REMAINING_ATTEMPTS_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE) - /** Original processing_at timestamp (epoch millis) from the source queue */ - val ARCHIVE_ORIGINAL_PROCESSING_AT: MetaField = - LongField("original_${Const.PROCESSING_AT_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE) + /** Original processing_at timestamptz from the source queue */ + val ARCHIVE_ORIGINAL_PROCESSING_AT: MetaField = + InstantField("original_${Const.PROCESSING_AT_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE) val ARCHIVE_FIELDS = listOf( ARCHIVE_ORIGINAL_ID,