diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ca155662eb..b55f22d1aa 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -423,6 +423,7 @@ testcontainers-localstack = { module = "org.testcontainers:localstack" } testcontainers-rabbitmq = { module = "org.testcontainers:rabbitmq" } testcontainers-kafka = { module = "org.testcontainers:kafka" } testcontainers-mysql = { module = "org.testcontainers:mysql" } +testcontainers-clickhouse = { module = "org.testcontainers:clickhouse" } testcontainers-postgresql = { module = "org.testcontainers:postgresql" } testcontainers-mongodb = { module = "org.testcontainers:mongodb" } testcontainers-pulsar = { module = "org.testcontainers:pulsar" } @@ -467,7 +468,7 @@ debezium-connector-sqlserver = { module = "io.debezium:debezium-connector-sqlser # Database drivers postgresql-jdbc = "org.postgresql:postgresql:42.7.10" sqlite-jdbc = "org.xerial:sqlite-jdbc:3.47.1.0" -clickhouse-jdbc = "com.clickhouse:clickhouse-jdbc:0.4.6" +clickhouse-jdbc = "com.clickhouse:clickhouse-jdbc:0.9.8" mariadb-jdbc = "org.mariadb.jdbc:mariadb-java-client:3.5.5" mysql-connector-j = "com.mysql:mysql-connector-j:9.4.0" openmldb-jdbc = { module = "com.4paradigm.openmldb:openmldb-jdbc", version.ref = "openmldb" } diff --git a/jdbc/clickhouse/build.gradle.kts b/jdbc/clickhouse/build.gradle.kts index 7a36945ed9..dc0167e274 100644 --- a/jdbc/clickhouse/build.gradle.kts +++ b/jdbc/clickhouse/build.gradle.kts @@ -24,4 +24,9 @@ plugins { dependencies { implementation(project(":jdbc:pulsar-io-jdbc-core")) runtimeOnly(libs.clickhouse.jdbc) + + testImplementation(libs.testcontainers.clickhouse) + testImplementation(libs.pulsar.client) + testImplementation(libs.pulsar.functions.instance) + testImplementation(libs.avro) } diff --git a/jdbc/clickhouse/src/test/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcSinkIntegrationTest.java b/jdbc/clickhouse/src/test/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcSinkIntegrationTest.java new file mode 100644 index 0000000000..8269b4b523 --- /dev/null +++ b/jdbc/clickhouse/src/test/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcSinkIntegrationTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.jdbc; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.schema.GenericObject; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.source.PulsarRecord; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Integration test for the ClickHouse JDBC sink, exercised against a real ClickHouse server + * via Testcontainers. + * + *
This test reproduces and guards against issue #32: with the old + * ClickHouse JDBC driver (0.4.6), {@code DatabaseMetaData.getTables()} returns no rows for an + * existing table on recent ClickHouse servers, so {@link JdbcUtils#getTableId} throws + * {@code "Not able to find table: pulsar_messages"} during {@code open()}. With the upgraded + * driver (0.9.8) the table is discovered and the sink writes successfully. + * + *
Because the failure is driver-side, this test is RED on driver 0.4.6 and GREEN on 0.9.8.
+ * The table is created and the results are verified over ClickHouse's HTTP interface rather than
+ * over JDBC, so the driver under test is exercised only by the sink itself — keeping the
+ * reproduction faithful to the {@code getTableId} failure in #32.
+ */
+@Slf4j
+public class ClickHouseJdbcSinkIntegrationTest {
+
+ private static final DockerImageName CLICKHOUSE_IMAGE =
+ DockerImageName.parse("clickhouse/clickhouse-server:24.3");
+
+ private static final String TABLE_NAME = "pulsar_messages";
+
+ private ClickHouseContainer clickhouse;
+ private BaseJdbcAutoSchemaSink jdbcSink;
+
+ /**
+ * A simple record class matching the {@link #TABLE_NAME} columns.
+ */
+ @Data
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class Foo {
+ private String field1;
+ private String field2;
+ private int field3;
+ }
+
+ @BeforeClass(alwaysRun = true)
+ public void setUp() throws Exception {
+ clickhouse = new ClickHouseContainer(CLICKHOUSE_IMAGE);
+ clickhouse.start();
+
+ // Create the destination table over the HTTP interface (ClickHouse requires an explicit
+ // engine). Deliberately not using JDBC here so the driver under test is exercised only by
+ // the sink.
+ httpQuery("CREATE TABLE " + TABLE_NAME + " ("
+ + " field1 String,"
+ + " field2 String,"
+ + " field3 Int32"
+ + ") ENGINE = MergeTree ORDER BY field3");
+
+ Map