diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.out new file mode 100644 index 00000000000000..bc97e1de4d975b --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.out @@ -0,0 +1,14 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !desc -- +id int No true \N +tag varchar(96) Yes false \N NONE +ts0 datetime Yes false \N NONE +dt0 datetime Yes false \N NONE + +-- !select_snapshot -- +1 snapshot_plus01 2024-06-15T18:00 2024-06-15T11:00 + +-- !select_binlog -- +1 snapshot_plus01 2024-06-15T18:00 2024-06-15T11:00 +2 binlog_plus01 2024-06-15T18:00 2024-06-15T11:00 + diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.out new file mode 100644 index 00000000000000..765db60f873ff4 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.out @@ -0,0 +1,29 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !desc -- +id int No true \N +tag varchar(96) Yes false \N NONE +ts0 datetime Yes false \N NONE +ts3 datetime(3) Yes false \N NONE +ts6 datetime(6) Yes false \N NONE +dt0 datetime Yes false \N NONE +dt6 datetime(6) Yes false \N NONE +d date Yes false \N NONE + +-- !select_snapshot -- +1 snapshot_plus08 2024-06-15T12:00 2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456 2024-06-15T20:00 2024-06-15T20:00:00.123456 2024-06-15 +2 snapshot_minus05 2024-06-16T01:00 2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456 2024-06-15T20:00 2024-06-15T20:00:00.123456 2024-06-15 +3 snapshot_utc 2024-06-15T20:00 2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456 2024-06-15T20:00 2024-06-15T20:00:00.123456 2024-06-15 +4 snapshot_null \N \N \N \N \N \N +5 snapshot_epoch_plus08 1970-01-01T00:00:01 1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456 1970-01-01T08:00:01 1970-01-01T08:00:01.123456 1970-01-01 + +-- !select_binlog -- +1 snapshot_plus08 2024-06-15T14:00 2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456 2024-06-15T20:00 2024-06-15T20:00:00.123456 2024-06-15 +2 snapshot_minus05 2024-06-16T01:00 2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456 2024-06-15T20:00 2024-06-15T20:00:00.123456 2024-06-15 +3 snapshot_utc 2024-06-15T20:00 2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456 2024-06-15T20:00 2024-06-15T20:00:00.123456 2024-06-15 +4 snapshot_null \N \N \N \N \N \N +5 snapshot_epoch_plus08 1970-01-01T00:00:01 1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456 1970-01-01T08:00:01 1970-01-01T08:00:01.123456 1970-01-01 +101 binlog_plus08 2024-06-15T12:00 2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456 2024-06-15T20:00 2024-06-15T20:00:00.123456 2024-06-15 +102 binlog_minus05 2024-06-16T01:00 2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456 2024-06-15T20:00 2024-06-15T20:00:00.123456 2024-06-15 +103 binlog_utc 2024-06-15T20:00 2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456 2024-06-15T20:00 2024-06-15T20:00:00.123456 2024-06-15 +105 binlog_epoch_plus08 1970-01-01T00:00:01 1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456 1970-01-01T08:00:01 1970-01-01T08:00:01.123456 1970-01-01 + diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.out new file mode 100644 index 00000000000000..2691ac4ac12258 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.out @@ -0,0 +1,28 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_snapshot_timestamp_pk -- +2024-01-01T00:00 A1 +2024-06-15T12:00:00.123456 B1 +2025-01-01T00:00 C1 +2025-06-15T12:34:56.999999 D1 +2026-01-01T00:00 E1 + +-- !select_snapshot_composite_pk -- +2024-02-01T00:00 1 A2 +2024-02-01T00:00 2 B2 +2024-02-02T12:00:00.500 3 C2 +2024-02-03T23:59:59.999999 4 D2 +2024-02-04T00:00 5 E2 + +-- !select_after_incr_timestamp_pk -- +2024-01-01T00:00 A1 +2024-06-15T12:00:00.123456 B2_upd +2025-01-01T00:00 C1 +2026-01-01T00:00 E1 +2026-06-01T00:00 F2 + +-- !select_after_incr_composite_pk -- +2024-02-01T00:00 1 A2 +2024-02-01T00:00 2 B2 +2024-02-02T12:00:00.500 3 C3_upd +2024-02-04T00:00 5 E2 +2024-02-05T00:00 6 F3 diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.out new file mode 100644 index 00000000000000..601f81c7ac90a0 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.out @@ -0,0 +1,14 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !desc -- +id int No true \N +tag text Yes false \N NONE +ts datetime(6) Yes false \N NONE +tstz datetime(6) Yes false \N NONE + +-- !select_snapshot -- +1 snapshot_plus01 2024-06-15T11:00 2024-06-15T18:00 + +-- !select_binlog -- +1 snapshot_plus01 2024-06-15T11:00 2024-06-15T18:00 +2 binlog_plus01 2024-06-15T11:00 2024-06-15T18:00 + diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.out new file mode 100644 index 00000000000000..e221f577163308 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.out @@ -0,0 +1,29 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !desc -- +id int No true \N +tag text Yes false \N NONE +ts datetime(6) Yes false \N NONE +tstz0 datetime Yes false \N NONE +tstz3 datetime(3) Yes false \N NONE +tstz6 datetime(6) Yes false \N NONE +ttz text Yes false \N NONE +d date Yes false \N NONE + +-- !select_snapshot -- +1 snapshot_plus08 2024-06-15T20:00 2024-06-15T12:00 2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456 20:00:00.123456+08 2024-06-15 +2 snapshot_minus05 2024-06-15T20:00 2024-06-16T01:00 2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456 20:00:00.123456-05 2024-06-15 +3 snapshot_utc 2024-06-15T20:00 2024-06-15T20:00 2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456 20:00:00.123456+00 2024-06-15 +4 snapshot_null \N \N \N \N \N \N +5 snapshot_epoch_plus08 1970-01-01T08:00:01 1970-01-01T00:00:01 1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456 08:00:01.123456+08 1970-01-01 + +-- !select_binlog -- +1 snapshot_plus08 2024-06-15T20:00 2024-06-15T14:00 2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456 20:00:00.123456+08 2024-06-15 +2 snapshot_minus05 2024-06-15T20:00 2024-06-16T01:00 2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456 20:00:00.123456-05 2024-06-15 +3 snapshot_utc 2024-06-15T20:00 2024-06-15T20:00 2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456 20:00:00.123456+00 2024-06-15 +4 snapshot_null \N \N \N \N \N \N +5 snapshot_epoch_plus08 1970-01-01T08:00:01 1970-01-01T00:00:01 1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456 08:00:01.123456+08 1970-01-01 +101 binlog_plus08 2024-06-15T20:00 2024-06-15T12:00 2024-06-15T12:00:00.123 2024-06-15T12:00:00.123456 20:00:00.123456+08 2024-06-15 +102 binlog_minus05 2024-06-15T20:00 2024-06-16T01:00 2024-06-16T01:00:00.123 2024-06-16T01:00:00.123456 20:00:00.123456-05 2024-06-15 +103 binlog_utc 2024-06-15T20:00 2024-06-15T20:00 2024-06-15T20:00:00.123 2024-06-15T20:00:00.123456 20:00:00.123456+00 2024-06-15 +105 binlog_epoch_plus08 1970-01-01T08:00:01 1970-01-01T00:00:01 1970-01-01T00:00:01.123 1970-01-01T00:00:01.123456 08:00:01.123456+08 1970-01-01 + diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.out new file mode 100644 index 00000000000000..a8ef3d6d9af160 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.out @@ -0,0 +1,42 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_snapshot_timestamp_pk -- +2024-01-01T00:00 A1 +2024-06-15T12:00:00.123456 B1 +2025-01-01T00:00 C1 +2025-06-15T12:34:56.999999 D1 +2026-01-01T00:00 E1 + +-- !select_snapshot_timestamptz_pk -- +2024-01-01T00:00 A1 +2024-06-15T12:00:00.123456 B1 +2025-01-01T00:00 C1 +2025-06-15T12:34:56.999999 D1 +2026-01-01T00:00 E1 + +-- !select_snapshot_composite_pk -- +2024-02-01T00:00 1 A2 +2024-02-01T00:00 2 B2 +2024-02-02T12:00:00.500 3 C2 +2024-02-03T23:59:59.999999 4 D2 +2024-02-04T00:00 5 E2 + +-- !select_after_incr_timestamp_pk -- +2024-01-01T00:00 A1 +2024-06-15T12:00:00.123456 B2_upd +2025-01-01T00:00 C1 +2026-01-01T00:00 E1 +2026-06-01T00:00 F2 + +-- !select_after_incr_timestamptz_pk -- +2024-01-01T00:00 A1 +2024-06-15T12:00:00.123456 B2_upd +2025-01-01T00:00 C1 +2026-01-01T00:00 E1 +2026-06-01T00:00 F2 + +-- !select_after_incr_composite_pk -- +2024-02-01T00:00 1 A2 +2024-02-01T00:00 2 B2 +2024-02-02T12:00:00.500 3 C3_upd +2024-02-04T00:00 5 E2 +2024-02-05T00:00 6 F3 diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.groovy new file mode 100644 index 00000000000000..9e72372644ea30 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_jdbc_servertimezone.groovy @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Recommended end-to-end tz configuration: align jdbc_url's serverTimezone +// with Doris session time_zone, so Doris users see TIMESTAMP columns as +// wall-clock in the cluster's local tz. +// +// jdbc_url is built from the Doris session tz at runtime, so the case works +// on clusters configured with different default tz values without code +// changes. +// +// Setup: +// source SET SESSION time_zone='+01:00', INSERT '2024-06-15 11:00:00' +// ts0 (TIMESTAMP) -> source-internal UTC instant 2024-06-15 10:00:00Z +// dt0 (DATETIME) -> literal '2024-06-15 11:00:00' +// jdbc_url serverTimezone= +// +// Expectations at Doris (.out is pre-filled for the standard Doris default +// session time_zone '+08:00'): +// ts0 -> '2024-06-15T18:00' (UTC 10:00Z + 8h = 18:00 in +08) +// dt0 -> '2024-06-15T11:00' (DATETIME has no tz semantics, stored verbatim) +suite("test_streaming_mysql_job_jdbc_servertimezone", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_jdbc_servertimezone_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "streaming_mysql_jdbc_servertimezone" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // Read Doris session tz so the cdc job aligns with it. + def dorisTz = (sql "select @@time_zone")[0][0] + log.info("Doris session time_zone = ${dorisTz}; jdbc_url serverTimezone will use the same.") + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """ + create table ${mysqlDb}.${table1} ( + id int primary key, + tag varchar(32), + ts0 timestamp null, + dt0 datetime null + ) engine=innodb charset=utf8; + """ + + sql """SET SESSION time_zone = '+01:00'""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1, 'snapshot_plus01', + '2024-06-15 11:00:00', '2024-06-15 11:00:00')""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=${dorisTz}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + log.info("snapshot row count: " + cnt) + cnt.get(0).get(0) == 1 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + qt_desc """desc ${currentDb}.${table1};""" + qt_select_snapshot """select * from ${currentDb}.${table1} order by id;""" + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """SET SESSION time_zone = '+01:00'""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (2, 'binlog_plus01', + '2024-06-15 11:00:00', '2024-06-15 11:00:00')""" + } + + Awaitility.await().atMost(180, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + cnt.get(0).get(0) == 2 + } + ) + + qt_select_binlog """select * from ${currentDb}.${table1} order by id;""" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name = '${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.groovy new file mode 100644 index 00000000000000..27e62a487c239e --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_source_timezone.groovy @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Verify cdc tz handling when source session tz differs from cdc-client tz. +// +// MySQL TIMESTAMP stores a UTC instant; the session tz only affects how the +// wall clock is parsed on write and rendered on read. DATETIME / DATE are +// wall-clock verbatim, no tz transform. +// +// Coverage in one table: +// * source session tz set to +08 / -05 / +00, same wall clock written -> +// different UTC instants prove cdc honors source session tz (not a +// hardcoded offset, not the cdc-client JVM tz). +// * NULL row across every temporal column. +// * MySQL TIMESTAMP epoch lower bound ('1970-01-01 00:00:01Z'). +// * Binlog path mirrors snapshot themes, plus an UPDATE that rewrites a +// TIMESTAMP column under +08. +// +// jdbc_url uses serverTimezone=UTC so cdc renders TIMESTAMP back to UTC +// wall clock regardless of the source session offset. +suite("test_streaming_mysql_job_source_timezone", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_source_timezone_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "streaming_mysql_source_timezone" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """ + create table ${mysqlDb}.${table1} ( + id int primary key, + tag varchar(32), + ts0 timestamp null, + ts3 timestamp(3) null, + ts6 timestamp(6) null, + dt0 datetime null, + dt6 datetime(6) null, + d date null + ) engine=innodb charset=utf8; + """ + + // id=1: +08 baseline + sql """SET SESSION time_zone = '+08:00'""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1, 'snapshot_plus08', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123456', + '2024-06-15')""" + + // id=2: -05 -> same wall clock crosses to next UTC day (20:00 -05 = 01:00 next day UTC) + sql """SET SESSION time_zone = '-05:00'""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (2, 'snapshot_minus05', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123456', + '2024-06-15')""" + + // id=3: +00 -> wall clock == UTC instant + sql """SET SESSION time_zone = '+00:00'""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (3, 'snapshot_utc', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123456', + '2024-06-15')""" + + // id=4: NULL across every nullable column + sql """INSERT INTO ${mysqlDb}.${table1} (id, tag) VALUES (4, 'snapshot_null')""" + + // id=5: epoch lower bound. MySQL TIMESTAMP min is '1970-01-01 00:00:01' UTC, + // so under +08 the smallest legal wall clock is '1970-01-01 08:00:01'. + sql """SET SESSION time_zone = '+08:00'""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (5, 'snapshot_epoch_plus08', + '1970-01-01 08:00:01', + '1970-01-01 08:00:01.123', + '1970-01-01 08:00:01.123456', + '1970-01-01 08:00:01', + '1970-01-01 08:00:01.123456', + '1970-01-01')""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + log.info("snapshot row count: " + cnt) + cnt.get(0).get(0) == 5 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + qt_desc """desc ${currentDb}.${table1};""" + qt_select_snapshot """select * from ${currentDb}.${table1} order by id;""" + + // Binlog phase: same tz themes through binlog, plus an UPDATE that + // rewrites ts0 on id=1 under +08 to confirm UPDATE follows the same + // tz codepath as INSERT. + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """SET SESSION time_zone = '+08:00'""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (101, 'binlog_plus08', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123456', + '2024-06-15')""" + + sql """SET SESSION time_zone = '-05:00'""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (102, 'binlog_minus05', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123456', + '2024-06-15')""" + + sql """SET SESSION time_zone = '+00:00'""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (103, 'binlog_utc', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123456', + '2024-06-15')""" + + sql """SET SESSION time_zone = '+08:00'""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (105, 'binlog_epoch_plus08', + '1970-01-01 08:00:01', + '1970-01-01 08:00:01.123', + '1970-01-01 08:00:01.123456', + '1970-01-01 08:00:01', + '1970-01-01 08:00:01.123456', + '1970-01-01')""" + + // UPDATE: id=1 was '20:00 +08' (UTC 12:00). Push wall clock to '22:00 +08' + // (UTC 14:00) so we can poll for completion. + sql """UPDATE ${mysqlDb}.${table1} SET ts0 = '2024-06-15 22:00:00' WHERE id = 1""" + } + + // Wait for 4 binlog INSERTs + UPDATE on id=1 to settle. + Awaitility.await().atMost(180, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + if (cnt.get(0).get(0) != 9) return false + def updated = sql """select ts0 from ${currentDb}.${table1} where id = 1""" + return updated.get(0).get(0).toString().startsWith('2024-06-15T14:00') + } + ) + + qt_select_binlog """select * from ${currentDb}.${table1} order by id;""" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name = '${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.groovy new file mode 100644 index 00000000000000..61bd7b0b87c40d --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_timestamp_pk.groovy @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Cover TIMESTAMP chunk-key path. TIMESTAMP is the epoch+tz column type, +// driver returns java.time.LocalDateTime by default; chunk-bound JSON +// round-trip exercises the LocalDateTime branch in +// AbstractCdcSourceReader.convertBound. +// +// Both source session and jdbc_url are pinned to UTC so the TIMESTAMP wall +// clock written by INSERT equals the value cdc renders, and the .out can +// be filled deterministically. +suite("test_streaming_mysql_job_timestamp_pk", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_timestamp_pk_name" + def currentDb = (sql "select database()")[0][0] + def tableTs = "events_mysql_timestamp_pk" + def tableComposite = "events_mysql_timestamp_id_pk" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${tableTs} force""" + sql """drop table if exists ${currentDb}.${tableComposite} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """SET SESSION time_zone = '+00:00'""" + + sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableTs}""" + sql """CREATE TABLE ${mysqlDb}.${tableTs} ( + `event_ts` timestamp(6) NOT NULL, + `payload` varchar(64), + PRIMARY KEY (`event_ts`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2024-01-01 00:00:00.000000', 'A1')""" + sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2024-06-15 12:00:00.123456', 'B1')""" + sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2025-01-01 00:00:00.000000', 'C1')""" + sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2025-06-15 12:34:56.999999', 'D1')""" + sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2026-01-01 00:00:00.000000', 'E1')""" + + sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableComposite}""" + sql """CREATE TABLE ${mysqlDb}.${tableComposite} ( + `event_ts` timestamp(6) NOT NULL, + `id` int NOT NULL, + `payload` varchar(64), + PRIMARY KEY (`event_ts`, `id`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES ('2024-02-01 00:00:00.000000', 1, 'A2')""" + sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES ('2024-02-01 00:00:00.000000', 2, 'B2')""" + sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES ('2024-02-02 12:00:00.500000', 3, 'C2')""" + sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES ('2024-02-03 23:59:59.999999', 4, 'D2')""" + sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES ('2024-02-04 00:00:00.000000', 5, 'E2')""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${tableTs},${tableComposite}", + "offset" = "initial", + "snapshot_split_size" = "2" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def c1 = sql """select count(1) from ${currentDb}.${tableTs}""" + def c2 = sql """select count(1) from ${currentDb}.${tableComposite}""" + log.info("snapshot row count ts=${c1} composite=${c2}") + c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + qt_select_snapshot_timestamp_pk """select event_ts, payload from ${currentDb}.${tableTs} order by event_ts asc""" + qt_select_snapshot_composite_pk """select event_ts, id, payload from ${currentDb}.${tableComposite} order by event_ts asc, id asc""" + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}?serverTimezone=UTC") { + sql """SET SESSION time_zone = '+00:00'""" + + sql """INSERT INTO ${mysqlDb}.${tableTs} VALUES ('2026-06-01 00:00:00.000000', 'F2')""" + sql """UPDATE ${mysqlDb}.${tableTs} SET payload='B2_upd' WHERE event_ts='2024-06-15 12:00:00.123456'""" + sql """DELETE FROM ${mysqlDb}.${tableTs} WHERE event_ts='2025-06-15 12:34:56.999999'""" + + sql """INSERT INTO ${mysqlDb}.${tableComposite} VALUES ('2024-02-05 00:00:00.000000', 6, 'F3')""" + sql """UPDATE ${mysqlDb}.${tableComposite} SET payload='C3_upd' WHERE event_ts='2024-02-02 12:00:00.500000' AND id=3""" + sql """DELETE FROM ${mysqlDb}.${tableComposite} WHERE event_ts='2024-02-03 23:59:59.999999' AND id=4""" + } + + try { + Awaitility.await().atMost(180, SECONDS) + .pollInterval(2, SECONDS).until( + { + def c1 = sql """select count(1) from ${currentDb}.${tableTs}""" + def c2 = sql """select count(1) from ${currentDb}.${tableComposite}""" + def upd1 = sql """select payload from ${currentDb}.${tableTs} where event_ts='2024-06-15 12:00:00.123456'""" + def upd2 = sql """select payload from ${currentDb}.${tableComposite} where event_ts='2024-02-02 12:00:00.500000' and id=3""" + def del1 = sql """select count(1) from ${currentDb}.${tableTs} where event_ts='2025-06-15 12:34:56.999999'""" + def del2 = sql """select count(1) from ${currentDb}.${tableComposite} where event_ts='2024-02-03 23:59:59.999999' and id=4""" + def p1 = upd1.size() == 0 ? null : upd1.get(0).get(0) + def p2 = upd2.size() == 0 ? null : upd2.get(0).get(0) + log.info("incr ts=${c1} composite=${c2} ts_upd=${p1} comp_upd=${p2} ts_del=${del1} comp_del=${del2}") + c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 && + p1 == 'B2_upd' && p2 == 'C3_upd' && + del1.get(0).get(0) == 0 && del2.get(0).get(0) == 0 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job (incr): " + showjob) + log.info("show task (incr): " + showtask) + throw ex + } + + qt_select_after_incr_timestamp_pk """select event_ts, payload from ${currentDb}.${tableTs} order by event_ts asc""" + qt_select_after_incr_composite_pk """select event_ts, id, payload from ${currentDb}.${tableComposite} order by event_ts asc, id asc""" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.groovy new file mode 100644 index 00000000000000..f2838fd4f8ae3c --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_jdbc_servertimezone.groovy @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// PG counterpart of test_streaming_mysql_job_jdbc_servertimezone. +// +// Align jdbc_url's timezone with Doris session time_zone, read at runtime so +// the case works on clusters with any default tz. +// +// Setup: +// source SET TIME ZONE INTERVAL '+01:00' HOUR TO MINUTE +// INSERT '2024-06-15 11:00:00' +// ts (timestamp) -> literal '2024-06-15 11:00:00' +// tstz (timestamptz) -> source-internal UTC instant 2024-06-15 10:00:00Z +// jdbc_url timezone= +// +// Expectations at Doris (.out is pre-filled for Doris default session +// time_zone '+08:00'): +// ts -> '2024-06-15T11:00' (verbatim, no tz semantics) +// tstz -> '2024-06-15T18:00' (UTC 10:00Z + 8h = 18:00 in +08) +// +// timetz column is intentionally omitted -- it is blocked by the upstream +// cdc ZonedTime bug and already tracked in the main *_source_timezone case. +suite("test_streaming_postgres_job_jdbc_servertimezone", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_jdbc_servertimezone_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "streaming_pg_jdbc_servertimezone" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // Read Doris session tz so the cdc job aligns with it. + def dorisTz = (sql "select @@time_zone")[0][0] + log.info("Doris session time_zone = ${dorisTz}; jdbc_url timezone will use the same.") + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """ + create table ${pgDB}.${pgSchema}.${table1} ( + id integer PRIMARY KEY, + tag varchar(32), + ts timestamp, + tstz timestamp with time zone + ); + """ + + sql """SET TIME ZONE INTERVAL '+01:00' HOUR TO MINUTE""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 'snapshot_plus01', + '2024-06-15 11:00:00', '2024-06-15 11:00:00')""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=${dorisTz}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + log.info("snapshot row count: " + cnt) + cnt.get(0).get(0) == 1 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + qt_desc """desc ${currentDb}.${table1};""" + qt_select_snapshot """select * from ${currentDb}.${table1} order by id;""" + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """SET TIME ZONE INTERVAL '+01:00' HOUR TO MINUTE""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 'binlog_plus01', + '2024-06-15 11:00:00', '2024-06-15 11:00:00')""" + } + + Awaitility.await().atMost(180, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + cnt.get(0).get(0) == 2 + } + ) + + qt_select_binlog """select * from ${currentDb}.${table1} order by id;""" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name = '${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.groovy new file mode 100644 index 00000000000000..b1911e1cdb27de --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_source_timezone.groovy @@ -0,0 +1,226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// PG counterpart of test_streaming_mysql_job_source_timezone. +// +// PG temporal semantics relevant to cdc tz handling: +// timestamp - wall clock, no tz; debezium emits epoch-style schema, cdc +// bypasses serverTimeZone. +// timestamptz - normalized to UTC on write per session TimeZone; debezium +// emits ZonedTimestamp ISO string, cdc renders it using +// serverTimeZone. +// timetz - time-of-day with offset; PG retains the session offset +// instead of normalizing to UTC. Debezium emits ZonedTime. +// (Note: cdc currently misses this case in convert(); the +// expected values below assume the upstream-fix behavior.) +// date - no tz; literal day, must not drift across +08 boundary. +// +// Coverage: +// * source session tz set to +08 / -05 / +00, same wall clock written -> +// different UTC instants for timestamptz prove cdc honors source session. +// * NULL row across every temporal column. +// * epoch lower bound ('1970-01-01 00:00:01Z'). +// * Binlog path mirrors snapshot themes, plus an UPDATE on tstz0 under +08. +// +// jdbc_url uses timezone=UTC so cdc renders timestamptz back to UTC wall +// clock regardless of the source session offset. +suite("test_streaming_postgres_job_source_timezone", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_source_timezone_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "streaming_pg_source_timezone" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """ + create table ${pgDB}.${pgSchema}.${table1} ( + id integer PRIMARY KEY, + tag varchar(32), + ts timestamp, + tstz0 timestamptz(0), + tstz3 timestamptz(3), + tstz6 timestamptz(6), + ttz time with time zone, + d date + ); + """ + + // INTERVAL form avoids depending on the container's tzdata. + // id=1: +08 baseline + sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 'snapshot_plus08', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '20:00:00.123456', + '2024-06-15')""" + + // id=2: -05 -> same wall clock crosses to next UTC day for timestamptz + sql """SET TIME ZONE INTERVAL '-05:00' HOUR TO MINUTE""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2, 'snapshot_minus05', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '20:00:00.123456', + '2024-06-15')""" + + // id=3: +00 -> wall clock == UTC instant + sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3, 'snapshot_utc', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '20:00:00.123456', + '2024-06-15')""" + + // id=4: NULL across every nullable column + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (id, tag) VALUES (4, 'snapshot_null')""" + + // id=5: epoch lower bound under +08 + sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (5, 'snapshot_epoch_plus08', + '1970-01-01 08:00:01', + '1970-01-01 08:00:01', + '1970-01-01 08:00:01.123', + '1970-01-01 08:00:01.123456', + '08:00:01.123456', + '1970-01-01')""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + log.info("snapshot row count: " + cnt) + cnt.get(0).get(0) == 5 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + qt_desc """desc ${currentDb}.${table1};""" + qt_select_snapshot """select * from ${currentDb}.${table1} order by id;""" + + // Binlog phase: same tz themes, plus an UPDATE that rewrites tstz0 + // on id=1 under +08 to confirm UPDATE follows the same tz codepath. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (101, 'binlog_plus08', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '20:00:00.123456', + '2024-06-15')""" + + sql """SET TIME ZONE INTERVAL '-05:00' HOUR TO MINUTE""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (102, 'binlog_minus05', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '20:00:00.123456', + '2024-06-15')""" + + sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (103, 'binlog_utc', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00', + '2024-06-15 20:00:00.123', + '2024-06-15 20:00:00.123456', + '20:00:00.123456', + '2024-06-15')""" + + sql """SET TIME ZONE INTERVAL '+08:00' HOUR TO MINUTE""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (105, 'binlog_epoch_plus08', + '1970-01-01 08:00:01', + '1970-01-01 08:00:01', + '1970-01-01 08:00:01.123', + '1970-01-01 08:00:01.123456', + '08:00:01.123456', + '1970-01-01')""" + + // UPDATE: id=1 tstz0 was '20:00 +08' (UTC 12:00). Push wall clock to + // '22:00 +08' (UTC 14:00) so we can poll for completion. + sql """UPDATE ${pgDB}.${pgSchema}.${table1} SET tstz0 = '2024-06-15 22:00:00' WHERE id = 1""" + } + + // Wait for 4 binlog INSERTs + UPDATE on id=1 to settle. + Awaitility.await().atMost(180, SECONDS) + .pollInterval(2, SECONDS).until( + { + def cnt = sql """select count(1) from ${currentDb}.${table1}""" + if (cnt.get(0).get(0) != 9) return false + def updated = sql """select tstz0 from ${currentDb}.${table1} where id = 1""" + return updated.get(0).get(0).toString().startsWith('2024-06-15T14:00') + } + ) + + qt_select_binlog """select * from ${currentDb}.${table1} order by id;""" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name = '${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.groovy new file mode 100644 index 00000000000000..02939cc230767d --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_timestamp_pk.groovy @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +// Cover PG timestamp / timestamptz chunk-key paths: timestamp (no tz, +// driver returns LocalDateTime), timestamptz (driver returns +// OffsetDateTime), plus a composite (timestamptz, id) PK to exercise +// multi-column locating. +// +// Source SET TIME ZONE = UTC and jdbc_url timezone=UTC so the timestamptz +// wall clock written by INSERT equals the value cdc renders, and the .out +// can be filled deterministically. +suite("test_streaming_postgres_job_timestamp_pk", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_timestamp_pk_name" + def currentDb = (sql "select database()")[0][0] + def tableTs = "events_pg_timestamp_pk" + def tableTstz = "events_pg_timestamptz_pk" + def tableComposite = "events_pg_timestamptz_id_pk" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${tableTs} force""" + sql """drop table if exists ${currentDb}.${tableTstz} force""" + sql """drop table if exists ${currentDb}.${tableComposite} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE""" + + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableTs}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableTs} ( + event_ts timestamp(6) NOT NULL, + payload varchar(64), + PRIMARY KEY (event_ts) + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES ('2024-01-01 00:00:00.000000', 'A1')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES ('2024-06-15 12:00:00.123456', 'B1')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES ('2025-01-01 00:00:00.000000', 'C1')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES ('2025-06-15 12:34:56.999999', 'D1')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES ('2026-01-01 00:00:00.000000', 'E1')""" + + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableTstz}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableTstz} ( + event_ts timestamptz(6) NOT NULL, + payload varchar(64), + PRIMARY KEY (event_ts) + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES ('2024-01-01 00:00:00.000000', 'A1')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES ('2024-06-15 12:00:00.123456', 'B1')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES ('2025-01-01 00:00:00.000000', 'C1')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES ('2025-06-15 12:34:56.999999', 'D1')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES ('2026-01-01 00:00:00.000000', 'E1')""" + + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableComposite}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableComposite} ( + event_ts timestamptz(6) NOT NULL, + id integer NOT NULL, + payload varchar(64), + PRIMARY KEY (event_ts, id) + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES ('2024-02-01 00:00:00.000000', 1, 'A2')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES ('2024-02-01 00:00:00.000000', 2, 'B2')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES ('2024-02-02 12:00:00.500000', 3, 'C2')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES ('2024-02-03 23:59:59.999999', 4, 'D2')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES ('2024-02-04 00:00:00.000000', 5, 'E2')""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${tableTs},${tableTstz},${tableComposite}", + "offset" = "initial", + "snapshot_split_size" = "2" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(2, SECONDS).until( + { + def c1 = sql """select count(1) from ${currentDb}.${tableTs}""" + def c2 = sql """select count(1) from ${currentDb}.${tableTstz}""" + def c3 = sql """select count(1) from ${currentDb}.${tableComposite}""" + log.info("snapshot row count ts=${c1} tstz=${c2} composite=${c3}") + c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 && c3.get(0).get(0) == 5 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + qt_select_snapshot_timestamp_pk """select event_ts, payload from ${currentDb}.${tableTs} order by event_ts asc""" + qt_select_snapshot_timestamptz_pk """select event_ts, payload from ${currentDb}.${tableTstz} order by event_ts asc""" + qt_select_snapshot_composite_pk """select event_ts, id, payload from ${currentDb}.${tableComposite} order by event_ts asc, id asc""" + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """SET TIME ZONE INTERVAL '+00:00' HOUR TO MINUTE""" + + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTs} VALUES ('2026-06-01 00:00:00.000000', 'F2')""" + sql """UPDATE ${pgDB}.${pgSchema}.${tableTs} SET payload='B2_upd' WHERE event_ts='2024-06-15 12:00:00.123456'""" + sql """DELETE FROM ${pgDB}.${pgSchema}.${tableTs} WHERE event_ts='2025-06-15 12:34:56.999999'""" + + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableTstz} VALUES ('2026-06-01 00:00:00.000000', 'F2')""" + sql """UPDATE ${pgDB}.${pgSchema}.${tableTstz} SET payload='B2_upd' WHERE event_ts='2024-06-15 12:00:00.123456'""" + sql """DELETE FROM ${pgDB}.${pgSchema}.${tableTstz} WHERE event_ts='2025-06-15 12:34:56.999999'""" + + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} VALUES ('2024-02-05 00:00:00.000000', 6, 'F3')""" + sql """UPDATE ${pgDB}.${pgSchema}.${tableComposite} SET payload='C3_upd' WHERE event_ts='2024-02-02 12:00:00.500000' AND id=3""" + sql """DELETE FROM ${pgDB}.${pgSchema}.${tableComposite} WHERE event_ts='2024-02-03 23:59:59.999999' AND id=4""" + } + + try { + Awaitility.await().atMost(180, SECONDS) + .pollInterval(2, SECONDS).until( + { + def c1 = sql """select count(1) from ${currentDb}.${tableTs}""" + def c2 = sql """select count(1) from ${currentDb}.${tableTstz}""" + def c3 = sql """select count(1) from ${currentDb}.${tableComposite}""" + def upd1 = sql """select payload from ${currentDb}.${tableTs} where event_ts='2024-06-15 12:00:00.123456'""" + def upd2 = sql """select payload from ${currentDb}.${tableTstz} where event_ts='2024-06-15 12:00:00.123456'""" + def upd3 = sql """select payload from ${currentDb}.${tableComposite} where event_ts='2024-02-02 12:00:00.500000' and id=3""" + def del1 = sql """select count(1) from ${currentDb}.${tableTs} where event_ts='2025-06-15 12:34:56.999999'""" + def del2 = sql """select count(1) from ${currentDb}.${tableTstz} where event_ts='2025-06-15 12:34:56.999999'""" + def del3 = sql """select count(1) from ${currentDb}.${tableComposite} where event_ts='2024-02-03 23:59:59.999999' and id=4""" + def p1 = upd1.size() == 0 ? null : upd1.get(0).get(0) + def p2 = upd2.size() == 0 ? null : upd2.get(0).get(0) + def p3 = upd3.size() == 0 ? null : upd3.get(0).get(0) + log.info("incr ts=${c1} tstz=${c2} comp=${c3} ts_upd=${p1} tstz_upd=${p2} comp_upd=${p3} dels=${del1}/${del2}/${del3}") + c1.get(0).get(0) == 5 && c2.get(0).get(0) == 5 && c3.get(0).get(0) == 5 && + p1 == 'B2_upd' && p2 == 'B2_upd' && p3 == 'C3_upd' && + del1.get(0).get(0) == 0 && del2.get(0).get(0) == 0 && del3.get(0).get(0) == 0 + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job (incr): " + showjob) + log.info("show task (incr): " + showtask) + throw ex + } + + qt_select_after_incr_timestamp_pk """select event_ts, payload from ${currentDb}.${tableTs} order by event_ts asc""" + qt_select_after_incr_timestamptz_pk """select event_ts, payload from ${currentDb}.${tableTstz} order by event_ts asc""" + qt_select_after_incr_composite_pk """select event_ts, id, payload from ${currentDb}.${tableComposite} order by event_ts asc, id asc""" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + + def jobCountRsp = sql """select count(1) from jobs("type"="insert") where Name ='${jobName}'""" + assert jobCountRsp.get(0).get(0) == 0 + } +}