Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/main/kotlin/kolbasa/consumer/ConsumerSchemaHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,10 @@ internal object ConsumerSchemaHelpers {
add(Const.DATA_COLUMN_NAME)
mainQueue.metadata.fields.forEach { add(it.dbColumnName) }
// Original values
add(Const.ID_COLUMN_NAME) // original id
add("(extract(epoch from ${Const.CREATED_AT_COLUMN_NAME}) * 1000)::bigint") // original created_at
add("(extract(epoch from ${Const.PROCESSING_AT_COLUMN_NAME}) * 1000)::bigint") // original processing_at
add("(extract(epoch from ${Const.SCHEDULED_AT_COLUMN_NAME}) * 1000)::bigint") // original scheduled_at
add(Const.ID_COLUMN_NAME) // original id
add(Const.CREATED_AT_COLUMN_NAME) // original created_at
add(Const.PROCESSING_AT_COLUMN_NAME) // original processing_at
add(Const.SCHEDULED_AT_COLUMN_NAME) // original scheduled_at
}
val selectStr = selectExprs.joinToString(",")

Expand Down Expand Up @@ -349,18 +349,18 @@ internal object ConsumerSchemaHelpers {

// SELECT expressions
val selectExprs = buildList {
add("statement_timestamp()") // scheduled_at = now
add("${archiveQueue.options.defaultAttempts}") // remaining_attempts (high value)
add("statement_timestamp()") // scheduled_at = now
add("${archiveQueue.options.defaultAttempts}") // remaining_attempts (high value)
add(Const.SHARD_COLUMN_NAME)
add(Const.PRODUCER_COLUMN_NAME)
add(Const.CONSUMER_COLUMN_NAME)
add(Const.DATA_COLUMN_NAME)
mainQueue.metadata.fields.forEach { add(it.dbColumnName) }
// Original values
add(Const.ID_COLUMN_NAME) // original id
add("(extract(epoch from ${Const.CREATED_AT_COLUMN_NAME}) * 1000)::bigint") // original created_at
add(Const.REMAINING_ATTEMPTS_COLUMN_NAME) // original remaining_attempts
add("(extract(epoch from ${Const.PROCESSING_AT_COLUMN_NAME}) * 1000)::bigint") // original processing_at
add(Const.ID_COLUMN_NAME) // original id
add(Const.CREATED_AT_COLUMN_NAME) // original created_at
add(Const.REMAINING_ATTEMPTS_COLUMN_NAME) // original remaining_attempts
add(Const.PROCESSING_AT_COLUMN_NAME) // original processing_at
}
val selectStr = selectExprs.joinToString(",")

Expand Down
31 changes: 16 additions & 15 deletions src/main/kotlin/kolbasa/queue/meta/Metadata.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kolbasa.queue.meta

import kolbasa.schema.Const
import java.time.Instant

/**
* Metadata for a queue.
Expand Down Expand Up @@ -32,17 +33,17 @@ data class Metadata(val fields: List<MetaField<*>>) {
val DLQ_ORIGINAL_ID: MetaField<Long> =
LongField("original_${Const.ID_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE)

/** Original created_at timestamp (epoch millis) from the source queue */
val DLQ_ORIGINAL_CREATED_AT: MetaField<Long> =
LongField("original_${Const.CREATED_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE)
/** Original created_at timestamptz from the source queue */
val DLQ_ORIGINAL_CREATED_AT: MetaField<Instant> =
InstantField("original_${Const.CREATED_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE)

/** Original processing_at timestamp (epoch millis) from the source queue */
val DLQ_ORIGINAL_PROCESSING_AT: MetaField<Long> =
LongField("original_${Const.PROCESSING_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE)
/** Original processing_at timestamptz from the source queue */
val DLQ_ORIGINAL_PROCESSING_AT: MetaField<Instant> =
InstantField("original_${Const.PROCESSING_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE)

/** Original scheduled_at timestamp (epoch millis) from the source queue */
val DLQ_ORIGINAL_SCHEDULED_AT: MetaField<Long> =
LongField("original_${Const.SCHEDULED_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE)
/** Original scheduled_at timestamptz from the source queue */
val DLQ_ORIGINAL_SCHEDULED_AT: MetaField<Instant> =
InstantField("original_${Const.SCHEDULED_AT_COLUMN_NAME}${Const.DLQ_TABLE_NAME_SUFFIX}", FieldOption.NONE)

val DLQ_FIELDS = listOf(
DLQ_ORIGINAL_ID,
Expand All @@ -59,17 +60,17 @@ data class Metadata(val fields: List<MetaField<*>>) {
val ARCHIVE_ORIGINAL_ID: MetaField<Long> =
LongField("original_${Const.ID_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE)

/** Original created_at timestamp (epoch millis) from the source queue */
val ARCHIVE_ORIGINAL_CREATED_AT: MetaField<Long> =
LongField("original_${Const.CREATED_AT_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE)
/** Original created_at timestamptz from the source queue */
val ARCHIVE_ORIGINAL_CREATED_AT: MetaField<Instant> =
InstantField("original_${Const.CREATED_AT_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE)

/** Original remaining_attempts from the source queue */
val ARCHIVE_ORIGINAL_REMAINING_ATTEMPTS: MetaField<Int> =
IntField("original_${Const.REMAINING_ATTEMPTS_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE)

/** Original processing_at timestamp (epoch millis) from the source queue */
val ARCHIVE_ORIGINAL_PROCESSING_AT: MetaField<Long> =
LongField("original_${Const.PROCESSING_AT_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE)
/** Original processing_at timestamptz from the source queue */
val ARCHIVE_ORIGINAL_PROCESSING_AT: MetaField<Instant> =
InstantField("original_${Const.PROCESSING_AT_COLUMN_NAME}${Const.ARCHIVE_TABLE_NAME_SUFFIX}", FieldOption.NONE)

val ARCHIVE_FIELDS = listOf(
ARCHIVE_ORIGINAL_ID,
Expand Down