From 9e060532f267eae7c396b9ea5382b45660437189 Mon Sep 17 00:00:00 2001 From: Dream95 Date: Wed, 6 May 2026 09:38:59 +0800 Subject: [PATCH 1/3] [feat][io] Add MQTT sink connector Signed-off-by: Dream95 --- README.md | 1 + distribution/io/build.gradle.kts | 1 + gradle/libs.versions.toml | 2 + mqtt/build.gradle.kts | 35 +++++ .../org/apache/pulsar/io/mqtt/MqttSink.java | 128 ++++++++++++++++ .../apache/pulsar/io/mqtt/MqttSinkConfig.java | 118 +++++++++++++++ .../META-INF/services/pulsar-io.yaml | 22 +++ .../pulsar/io/mqtt/MqttSinkConfigTest.java | 118 +++++++++++++++ .../apache/pulsar/io/mqtt/MqttSinkTest.java | 142 ++++++++++++++++++ mqtt/src/test/resources/sinkConfig.yaml | 29 ++++ settings.gradle.kts | 1 + 11 files changed, 597 insertions(+) create mode 100644 mqtt/build.gradle.kts create mode 100644 mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java create mode 100644 mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java create mode 100644 mqtt/src/main/resources/META-INF/services/pulsar-io.yaml create mode 100644 mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java create mode 100644 mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java create mode 100644 mqtt/src/test/resources/sinkConfig.yaml diff --git a/README.md b/README.md index ea0694f752..174bf57e4f 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,7 @@ mounting them into the `apachepulsar/pulsar` Docker image. | Kafka | Apache Kafka | | Kinesis | Amazon Kinesis Data Streams | | MongoDB | MongoDB | +| MQTT | MQTT broker | | Redis | Redis | | Solr | Apache Solr | diff --git a/distribution/io/build.gradle.kts b/distribution/io/build.gradle.kts index 9aa7b81470..66f3a6c7fe 100644 --- a/distribution/io/build.gradle.kts +++ b/distribution/io/build.gradle.kts @@ -65,6 +65,7 @@ dependencies { connectorNars(project(":influxdb")) connectorNars(project(":redis")) connectorNars(project(":solr")) + connectorNars(project(":mqtt")) connectorNars(project(":dynamodb")) connectorNars(project(":alluxio")) connectorNars(project(":azure-data-explorer")) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e397b65b12..ca155662eb 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -176,6 +176,7 @@ aerospike-client = "4.5.0" aws-sdk = "1.12.788" aws-sdk2 = "2.32.28" rabbitmq-client = "5.28.0" +hivemq-mqtt-client = "1.3.13" cassandra-driver = "3.11.2" mongodb-driver = "5.4.0" influxdb-client = "7.3.0" @@ -494,6 +495,7 @@ solr-test-framework = { module = "org.apache.solr:solr-test-framework", version. solr-core = { module = "org.apache.solr:solr-core", version.ref = "solr" } # Messaging rabbitmq-amqp-client = { module = "com.rabbitmq:amqp-client", version.ref = "rabbitmq-client" } +hivemq-mqtt-client = { module = "com.hivemq:hivemq-mqtt-client", version.ref = "hivemq-mqtt-client" } nsq-j = { module = "com.sproutsocial:nsq-j", version.ref = "nsq-client" } # Time series influxdb-client-java = { module = "com.influxdb:influxdb-client-java", version.ref = "influxdb-client" } diff --git a/mqtt/build.gradle.kts b/mqtt/build.gradle.kts new file mode 100644 index 0000000000..343be0c079 --- /dev/null +++ b/mqtt/build.gradle.kts @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("pulsar-connectors.java-conventions") + id("pulsar-connectors.nar-conventions") +} + +dependencies { + implementation(libs.pulsar.io.common) + implementation(libs.pulsar.io.core) + implementation(libs.pulsar.functions.instance) + implementation(libs.jackson.databind) + implementation(libs.jackson.dataformat.yaml) + implementation(libs.commons.lang3) + implementation(libs.hivemq.mqtt.client) + + testImplementation(libs.testcontainers) +} diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java new file mode 100644 index 0000000000..2331147d78 --- /dev/null +++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.core.annotations.Connector; +import org.apache.pulsar.io.core.annotations.IOType; + +@Connector( + name = "mqtt-sink", + type = IOType.SINK, + help = "A sink connector that moves messages from Pulsar to MQTT.", + configClass = MqttSinkConfig.class +) +@Slf4j +public class MqttSink implements Sink { + + private MqttSinkConfig mqttSinkConfig; + private Mqtt5AsyncClient mqttClient; + private MqttQos mqttQos; + + @Override + public void open(Map config, SinkContext sinkContext) throws Exception { + mqttSinkConfig = MqttSinkConfig.load(config, sinkContext); + mqttSinkConfig.validate(); + mqttQos = MqttQos.fromCode(mqttSinkConfig.getQos()); + + var builder = MqttClient.builder() + .useMqttVersion5() + .serverHost(mqttSinkConfig.getServerHost()) + .serverPort(mqttSinkConfig.getServerPort()); + + if (StringUtils.isNotBlank(mqttSinkConfig.getClientId())) { + builder = builder.identifier(mqttSinkConfig.getClientId()); + } + if (mqttSinkConfig.isSslEnabled()) { + builder = builder.sslWithDefaultConfig(); + } + + mqttClient = builder.buildAsync(); + if (StringUtils.isNotBlank(mqttSinkConfig.getUsername())) { + var authBuilder = mqttClient.connectWith() + .cleanStart(mqttSinkConfig.isCleanStart()) + .keepAlive(mqttSinkConfig.getKeepAliveIntervalSec()) + .simpleAuth() + .username(mqttSinkConfig.getUsername()); + if (mqttSinkConfig.getPassword() != null) { + authBuilder = authBuilder.password(mqttSinkConfig.getPassword().getBytes(StandardCharsets.UTF_8)); + } + authBuilder.applySimpleAuth() + .send() + .get(mqttSinkConfig.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS); + } else { + mqttClient.connectWith() + .cleanStart(mqttSinkConfig.isCleanStart()) + .keepAlive(mqttSinkConfig.getKeepAliveIntervalSec()) + .send() + .get(mqttSinkConfig.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS); + } + log.info("MQTT sink connected to {}:{}.", + mqttSinkConfig.getServerHost(), mqttSinkConfig.getServerPort()); + } + + @Override + public void write(Record record) { + try { + byte[] payload = record.getValue() == null ? new byte[0] : record.getValue(); + mqttClient.publishWith() + .topic(mqttSinkConfig.getTopic()) + .qos(mqttQos) + .payload(payload) + .send() + .whenComplete((result, throwable) -> { + if (throwable == null) { + record.ack(); + } else { + record.fail(); + log.warn("Failed to publish message to MQTT topic {}", + mqttSinkConfig.getTopic(), throwable); + } + }); + } catch (Exception e) { + record.fail(); + log.warn("Failed to schedule MQTT publish for topic {}", mqttSinkConfig.getTopic(), e); + } + } + + @Override + public void close() { + if (mqttClient == null) { + return; + } + + try { + mqttClient.disconnectWith() + .send() + .get(mqttSinkConfig.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + log.warn("Failed to disconnect MQTT client cleanly", e); + } + } +} diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java new file mode 100644 index 0000000000..4ed1cd927a --- /dev/null +++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import lombok.Data; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.core.annotations.FieldDoc; + +@Data +@Accessors(chain = true) +public class MqttSinkConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + @FieldDoc( + required = true, + defaultValue = "", + help = "The MQTT broker host.") + private String serverHost; + + @FieldDoc( + required = true, + defaultValue = "1883", + help = "The MQTT broker port.") + private int serverPort = 1883; + + @FieldDoc( + required = true, + defaultValue = "", + help = "The MQTT topic to publish messages to.") + private String topic; + + @FieldDoc( + defaultValue = "", + help = "MQTT client id used for the broker connection.") + private String clientId; + + @FieldDoc( + defaultValue = "", + sensitive = true, + help = "MQTT username.") + private String username; + + @FieldDoc( + defaultValue = "", + sensitive = true, + help = "MQTT password.") + private String password; + + @FieldDoc( + defaultValue = "0", + help = "MQTT QoS level for outgoing messages. Valid values: 0, 1, 2.") + private int qos = 0; + + @FieldDoc( + defaultValue = "60", + help = "MQTT keep alive interval in seconds.") + private int keepAliveIntervalSec = 60; + + @FieldDoc( + defaultValue = "10000", + help = "Timeout in milliseconds for MQTT connect/disconnect operations.") + private long connectionTimeoutMs = 10000L; + + @FieldDoc( + defaultValue = "true", + help = "Whether to start with a clean session.") + private boolean cleanStart = true; + + @FieldDoc( + defaultValue = "false", + help = "Enable SSL/TLS with the client default SSL configuration.") + private boolean sslEnabled = false; + + public static MqttSinkConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return mapper.readValue(new File(yamlFile), MqttSinkConfig.class); + } + + public static MqttSinkConfig load(Map map, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, MqttSinkConfig.class, sinkContext); + } + + public void validate() { + Preconditions.checkArgument(StringUtils.isNotBlank(serverHost), "serverHost cannot be blank"); + Preconditions.checkArgument(serverPort > 0, "serverPort must be a positive integer"); + Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank"); + Preconditions.checkArgument(qos >= 0 && qos <= 2, "qos must be one of 0, 1, 2"); + Preconditions.checkArgument(keepAliveIntervalSec >= 0, "keepAliveIntervalSec must be >= 0"); + Preconditions.checkArgument(connectionTimeoutMs > 0, "connectionTimeoutMs must be > 0"); + } +} diff --git a/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml b/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml new file mode 100644 index 0000000000..d56ad14364 --- /dev/null +++ b/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +name: mqtt-sink +description: MQTT sink connector +sinkClass: org.apache.pulsar.io.mqtt.MqttSink +sinkConfigClass: org.apache.pulsar.io.mqtt.MqttSinkConfig diff --git a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java new file mode 100644 index 0000000000..e341f6ec8f --- /dev/null +++ b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.io.core.SinkContext; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +public class MqttSinkConfigTest { + + @Test + public void loadFromYamlFileTest() throws IOException { + File yamlFile = getFile("sinkConfig.yaml"); + MqttSinkConfig config = MqttSinkConfig.load(yamlFile.getAbsolutePath()); + assertNotNull(config); + assertEquals(config.getServerHost(), "localhost"); + assertEquals(config.getServerPort(), 1883); + assertEquals(config.getTopic(), "test/topic"); + assertEquals(config.getClientId(), "pulsar-mqtt-test"); + assertEquals(config.getUsername(), "mqtt-user"); + assertEquals(config.getPassword(), "mqtt-password"); + assertEquals(config.getQos(), 1); + assertEquals(config.getKeepAliveIntervalSec(), 45); + assertEquals(config.getConnectionTimeoutMs(), 15000L); + assertTrue(config.isCleanStart()); + assertFalse(config.isSslEnabled()); + } + + @Test + public void loadFromMapTest() throws IOException { + SinkContext sinkContext = Mockito.mock(SinkContext.class); + MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(), sinkContext); + + assertNotNull(config); + assertEquals(config.getServerHost(), "localhost"); + assertEquals(config.getServerPort(), 1883); + assertEquals(config.getTopic(), "test/topic"); + assertEquals(config.getClientId(), "pulsar-mqtt-test"); + assertEquals(config.getQos(), 1); + } + + @Test + public void loadFromMapCredentialsFromSecretTest() throws IOException { + Map map = baseConfigMap(); + map.remove("username"); + map.remove("password"); + + SinkContext sinkContext = Mockito.mock(SinkContext.class); + Mockito.when(sinkContext.getSecret("username")).thenReturn("mqtt-user"); + Mockito.when(sinkContext.getSecret("password")).thenReturn("mqtt-password"); + + MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext); + assertEquals(config.getUsername(), "mqtt-user"); + assertEquals(config.getPassword(), "mqtt-password"); + } + + @Test + public void validValidateTest() throws IOException { + SinkContext sinkContext = Mockito.mock(SinkContext.class); + MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(), sinkContext); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "qos must be one of 0, 1, 2") + public void invalidQosValidateTest() throws IOException { + Map map = baseConfigMap(); + map.put("qos", 3); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext); + config.validate(); + } + + private static Map baseConfigMap() { + Map map = new HashMap<>(); + map.put("serverHost", "localhost"); + map.put("serverPort", 1883); + map.put("topic", "test/topic"); + map.put("clientId", "pulsar-mqtt-test"); + map.put("username", "mqtt-user"); + map.put("password", "mqtt-password"); + map.put("qos", 1); + map.put("keepAliveIntervalSec", 45); + map.put("connectionTimeoutMs", 15000); + map.put("cleanStart", true); + map.put("sslEnabled", false); + return map; + } + + private File getFile(String name) { + ClassLoader classLoader = getClass().getClassLoader(); + return new File(classLoader.getResource(name).getFile()); + } +} diff --git a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java new file mode 100644 index 0000000000..ba9a1e1332 --- /dev/null +++ b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.SinkContext; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class MqttSinkTest { + + private static final int MQTT_PORT = 1883; + private static final String TEST_TOPIC = "pulsar/mqtt/e2e"; + private static final DockerImageName MOSQUITTO_IMAGE = DockerImageName.parse("eclipse-mosquitto:2"); + + private final GenericContainer mqttContainer = new GenericContainer<>(MOSQUITTO_IMAGE) + .withExposedPorts(MQTT_PORT); + + @BeforeClass(alwaysRun = true) + public void beforeClass() { + mqttContainer.start(); + } + + @AfterClass(alwaysRun = true) + public void afterClass() { + mqttContainer.stop(); + } + + @Test + public void testWriteE2EWithMosquitto() throws Exception { + BlockingQueue receivedPayloads = new LinkedBlockingQueue<>(); + CountDownLatch ackLatch = new CountDownLatch(3); + AtomicBoolean failCalled = new AtomicBoolean(false); + + Mqtt5AsyncClient subscriber = MqttClient.builder() + .useMqttVersion5() + .serverHost(mqttContainer.getHost()) + .serverPort(mqttContainer.getMappedPort(MQTT_PORT)) + .identifier("mqtt-sink-e2e-subscriber") + .buildAsync(); + + subscriber.connectWith() + .cleanStart(true) + .send() + .get(10, TimeUnit.SECONDS); + subscriber.subscribeWith() + .topicFilter(TEST_TOPIC) + .callback(publish -> receivedPayloads.add( + new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8))) + .send() + .get(10, TimeUnit.SECONDS); + + Map config = new HashMap<>(); + config.put("serverHost", mqttContainer.getHost()); + config.put("serverPort", mqttContainer.getMappedPort(MQTT_PORT)); + config.put("topic", TEST_TOPIC); + config.put("qos", 1); + config.put("connectionTimeoutMs", 10000); + config.put("clientId", "mqtt-sink-e2e-publisher"); + + SinkContext sinkContext = mock(SinkContext.class); + try (MqttSink sink = new MqttSink()) { + sink.open(config, sinkContext); + + for (int i = 0; i < 3; i++) { + sink.write(new TestRecord(("msg-" + i).getBytes(StandardCharsets.UTF_8), ackLatch, failCalled)); + } + + assertTrue(ackLatch.await(10, TimeUnit.SECONDS), "Timed out waiting for record.ack()"); + assertFalse(failCalled.get(), "record.fail() should not be called on successful publish"); + + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-0"); + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-1"); + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-2"); + } finally { + subscriber.disconnectWith() + .sessionExpiryInterval(0) + .send() + .get(10, TimeUnit.SECONDS); + } + } + + private static final class TestRecord implements Record { + private final byte[] value; + private final CountDownLatch ackLatch; + private final AtomicBoolean failCalled; + + private TestRecord(byte[] value, CountDownLatch ackLatch, AtomicBoolean failCalled) { + this.value = value; + this.ackLatch = ackLatch; + this.failCalled = failCalled; + } + + @Override + public byte[] getValue() { + return value; + } + + @Override + public void ack() { + ackLatch.countDown(); + } + + @Override + public void fail() { + failCalled.set(true); + } + } +} diff --git a/mqtt/src/test/resources/sinkConfig.yaml b/mqtt/src/test/resources/sinkConfig.yaml new file mode 100644 index 0000000000..69eb3f7ee5 --- /dev/null +++ b/mqtt/src/test/resources/sinkConfig.yaml @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +serverHost: localhost +serverPort: 1883 +topic: test/topic +clientId: pulsar-mqtt-test +username: mqtt-user +password: mqtt-password +qos: 1 +keepAliveIntervalSec: 45 +connectionTimeoutMs: 15000 +cleanStart: true +sslEnabled: false diff --git a/settings.gradle.kts b/settings.gradle.kts index d186f01b9c..cdef210517 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -86,6 +86,7 @@ include("nsq") include("rabbitmq") include("redis") include("solr") +include("mqtt") // JDBC — parent + sub-modules with qualified names to avoid clashes with debezium include("jdbc") From 85364cd5f96b2bd46bd47552710008f46609c62e Mon Sep 17 00:00:00 2001 From: Dream95 Date: Sat, 9 May 2026 16:25:10 +0800 Subject: [PATCH 2/3] [fix][io] Stabilize MQTT sink tests and split integration coverage Signed-off-by: Dream95 --- mqtt/build.gradle.kts | 20 ++- .../io/mqtt/MqttSinkIntegrationTest.java | 150 ++++++++++++++++++ .../org/apache/pulsar/io/mqtt/MqttSink.java | 9 +- .../apache/pulsar/io/mqtt/MqttSinkConfig.java | 2 + .../META-INF/services/pulsar-io.yaml | 2 +- .../pulsar/io/mqtt/MqttSinkConfigTest.java | 23 ++- .../apache/pulsar/io/mqtt/MqttSinkTest.java | 127 +++++++-------- 7 files changed, 255 insertions(+), 78 deletions(-) create mode 100644 mqtt/src/integrationTest/java/org/apache/pulsar/io/mqtt/MqttSinkIntegrationTest.java diff --git a/mqtt/build.gradle.kts b/mqtt/build.gradle.kts index 343be0c079..671a8380a3 100644 --- a/mqtt/build.gradle.kts +++ b/mqtt/build.gradle.kts @@ -22,6 +22,15 @@ plugins { id("pulsar-connectors.nar-conventions") } +val integrationTest by sourceSets.creating { + compileClasspath += sourceSets.main.get().output + configurations.testCompileClasspath.get() + runtimeClasspath += output + sourceSets.main.get().output + configurations.testRuntimeClasspath.get() + resources.srcDir(rootProject.file("gradle/test-resources")) +} + +configurations[integrationTest.implementationConfigurationName].extendsFrom(configurations.testImplementation.get()) +configurations[integrationTest.runtimeOnlyConfigurationName].extendsFrom(configurations.testRuntimeOnly.get()) + dependencies { implementation(libs.pulsar.io.common) implementation(libs.pulsar.io.core) @@ -29,7 +38,16 @@ dependencies { implementation(libs.jackson.databind) implementation(libs.jackson.dataformat.yaml) implementation(libs.commons.lang3) + implementation(libs.guava) implementation(libs.hivemq.mqtt.client) - testImplementation(libs.testcontainers) + add(integrationTest.implementationConfigurationName, libs.testcontainers) +} + +tasks.register("integrationTest") { + description = "Runs MQTT integration tests that require Docker." + group = LifecycleBasePlugin.VERIFICATION_GROUP + testClassesDirs = integrationTest.output.classesDirs + classpath = integrationTest.runtimeClasspath + shouldRunAfter("test") } diff --git a/mqtt/src/integrationTest/java/org/apache/pulsar/io/mqtt/MqttSinkIntegrationTest.java b/mqtt/src/integrationTest/java/org/apache/pulsar/io/mqtt/MqttSinkIntegrationTest.java new file mode 100644 index 0000000000..85b34c4e76 --- /dev/null +++ b/mqtt/src/integrationTest/java/org/apache/pulsar/io/mqtt/MqttSinkIntegrationTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.SinkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class MqttSinkIntegrationTest { + + private static final Logger log = LoggerFactory.getLogger(MqttSinkIntegrationTest.class); + + private static final int MQTT_PORT = 1883; + private static final String TEST_TOPIC = "pulsar/mqtt/e2e"; + private static final DockerImageName MOSQUITTO_IMAGE = DockerImageName.parse("eclipse-mosquitto:2"); + + private final GenericContainer mqttContainer = new GenericContainer<>(MOSQUITTO_IMAGE) + .withExposedPorts(MQTT_PORT); + + @BeforeClass(alwaysRun = true) + public void beforeClass() { + mqttContainer.start(); + } + + @AfterClass(alwaysRun = true) + public void afterClass() { + mqttContainer.stop(); + } + + @Test + public void testWriteE2EWithMosquitto() throws Exception { + BlockingQueue receivedPayloads = new LinkedBlockingQueue<>(); + CountDownLatch ackLatch = new CountDownLatch(3); + AtomicBoolean failCalled = new AtomicBoolean(false); + + Mqtt5AsyncClient subscriber = MqttClient.builder() + .useMqttVersion5() + .serverHost(mqttContainer.getHost()) + .serverPort(mqttContainer.getMappedPort(MQTT_PORT)) + .identifier("mqtt-sink-e2e-subscriber") + .buildAsync(); + + subscriber.connectWith() + .cleanStart(true) + .send() + .get(10, TimeUnit.SECONDS); + subscriber.subscribeWith() + .topicFilter(TEST_TOPIC) + .callback(publish -> receivedPayloads.add( + new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8))) + .send() + .get(10, TimeUnit.SECONDS); + + Map config = new HashMap<>(); + config.put("serverHost", mqttContainer.getHost()); + config.put("serverPort", mqttContainer.getMappedPort(MQTT_PORT)); + config.put("topic", TEST_TOPIC); + config.put("qos", 1); + config.put("connectionTimeoutMs", 10000); + config.put("clientId", "mqtt-sink-e2e-publisher"); + + SinkContext sinkContext = mock(SinkContext.class); + try (MqttSink sink = new MqttSink()) { + sink.open(config, sinkContext); + + for (int i = 0; i < 3; i++) { + sink.write(new TestRecord(("msg-" + i).getBytes(StandardCharsets.UTF_8), ackLatch, failCalled)); + } + + assertTrue(ackLatch.await(10, TimeUnit.SECONDS), "Timed out waiting for record.ack()"); + assertFalse(failCalled.get(), "record.fail() should not be called on successful publish"); + + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-0"); + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-1"); + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-2"); + } finally { + try { + subscriber.disconnectWith() + .sessionExpiryInterval(0) + .send() + .get(10, TimeUnit.SECONDS); + } catch (Exception e) { + log.warn("Failed to disconnect MQTT subscriber in test cleanup", e); + } + } + } + + private static final class TestRecord implements Record { + private final byte[] value; + private final CountDownLatch ackLatch; + private final AtomicBoolean failCalled; + + private TestRecord(byte[] value, CountDownLatch ackLatch, AtomicBoolean failCalled) { + this.value = value; + this.ackLatch = ackLatch; + this.failCalled = failCalled; + } + + @Override + public byte[] getValue() { + return value; + } + + @Override + public void ack() { + ackLatch.countDown(); + } + + @Override + public void fail() { + failCalled.set(true); + } + } +} diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java index 2331147d78..50570979b7 100644 --- a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java +++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java @@ -21,6 +21,7 @@ import com.hivemq.client.mqtt.MqttClient; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -33,7 +34,7 @@ import org.apache.pulsar.io.core.annotations.IOType; @Connector( - name = "mqtt-sink", + name = "mqtt", type = IOType.SINK, help = "A sink connector that moves messages from Pulsar to MQTT.", configClass = MqttSinkConfig.class @@ -63,7 +64,7 @@ public void open(Map config, SinkContext sinkContext) throws Exc builder = builder.sslWithDefaultConfig(); } - mqttClient = builder.buildAsync(); + mqttClient = buildClient(builder); if (StringUtils.isNotBlank(mqttSinkConfig.getUsername())) { var authBuilder = mqttClient.connectWith() .cleanStart(mqttSinkConfig.isCleanStart()) @@ -87,6 +88,10 @@ public void open(Map config, SinkContext sinkContext) throws Exc mqttSinkConfig.getServerHost(), mqttSinkConfig.getServerPort()); } + Mqtt5AsyncClient buildClient(Mqtt5ClientBuilder builder) { + return builder.buildAsync(); + } + @Override public void write(Record record) { try { diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java index 4ed1cd927a..44a1bbc543 100644 --- a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java +++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java @@ -111,6 +111,8 @@ public void validate() { Preconditions.checkArgument(StringUtils.isNotBlank(serverHost), "serverHost cannot be blank"); Preconditions.checkArgument(serverPort > 0, "serverPort must be a positive integer"); Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank"); + Preconditions.checkArgument(StringUtils.isNotBlank(username) || StringUtils.isBlank(password), + "password cannot be set when username is blank"); Preconditions.checkArgument(qos >= 0 && qos <= 2, "qos must be one of 0, 1, 2"); Preconditions.checkArgument(keepAliveIntervalSec >= 0, "keepAliveIntervalSec must be >= 0"); Preconditions.checkArgument(connectionTimeoutMs > 0, "connectionTimeoutMs must be > 0"); diff --git a/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml b/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml index d56ad14364..b93d81afac 100644 --- a/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. # -name: mqtt-sink +name: mqtt description: MQTT sink connector sinkClass: org.apache.pulsar.io.mqtt.MqttSink sinkConfigClass: org.apache.pulsar.io.mqtt.MqttSinkConfig diff --git a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java index e341f6ec8f..0707f0a788 100644 --- a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java +++ b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java @@ -24,8 +24,12 @@ import static org.testng.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import org.apache.pulsar.io.core.SinkContext; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -33,7 +37,7 @@ public class MqttSinkConfigTest { @Test - public void loadFromYamlFileTest() throws IOException { + public void loadFromYamlFileTest() throws Exception { File yamlFile = getFile("sinkConfig.yaml"); MqttSinkConfig config = MqttSinkConfig.load(yamlFile.getAbsolutePath()); assertNotNull(config); @@ -95,6 +99,16 @@ public void invalidQosValidateTest() throws IOException { config.validate(); } + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "password cannot be set when username is blank") + public void passwordWithoutUsernameValidateTest() throws IOException { + Map map = baseConfigMap(); + map.put("username", ""); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext); + config.validate(); + } + private static Map baseConfigMap() { Map map = new HashMap<>(); map.put("serverHost", "localhost"); @@ -111,8 +125,9 @@ private static Map baseConfigMap() { return map; } - private File getFile(String name) { - ClassLoader classLoader = getClass().getClassLoader(); - return new File(classLoader.getResource(name).getFile()); + private File getFile(String name) throws URISyntaxException { + URL resource = Objects.requireNonNull(getClass().getClassLoader().getResource(name), + "Missing test resource: " + name); + return Paths.get(resource.toURI()).toFile(); } } diff --git a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java index ba9a1e1332..326d5a9751 100644 --- a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java +++ b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java @@ -18,99 +18,82 @@ */ package org.apache.pulsar.io.mqtt; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertTrue; -import com.hivemq.client.mqtt.MqttClient; import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; -import java.nio.charset.StandardCharsets; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder; +import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.SinkContext; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.utility.DockerImageName; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.mockito.Mockito; import org.testng.annotations.Test; public class MqttSinkTest { - private static final int MQTT_PORT = 1883; - private static final String TEST_TOPIC = "pulsar/mqtt/e2e"; - private static final DockerImageName MOSQUITTO_IMAGE = DockerImageName.parse("eclipse-mosquitto:2"); + @Test + public void writeShouldCallFailWhenPublishThrowsSynchronously() { + Mqtt5AsyncClient mqttClient = mock(Mqtt5AsyncClient.class); + when(mqttClient.publishWith()).thenThrow(new RuntimeException("publish failed")); + MqttSink sink = newSinkWithOpenedClient(mqttClient); + TestRecord record = new TestRecord("x".getBytes(), new CountDownLatch(1), new AtomicBoolean(false)); - private final GenericContainer mqttContainer = new GenericContainer<>(MOSQUITTO_IMAGE) - .withExposedPorts(MQTT_PORT); + sink.write(record); - @BeforeClass(alwaysRun = true) - public void beforeClass() { - mqttContainer.start(); + assertTrue(record.isFailed(), "record.fail() should be called"); } - @AfterClass(alwaysRun = true) - public void afterClass() { - mqttContainer.stop(); + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "password cannot be set when username is blank") + public void openShouldPropagateConfigValidationFailure() throws Exception { + Map invalidConfig = baseConfigMap(); + invalidConfig.put("username", ""); + invalidConfig.put("password", "pwd"); + try (MqttSink sink = new MqttSink()) { + sink.open(invalidConfig, mock(SinkContext.class)); + } } @Test - public void testWriteE2EWithMosquitto() throws Exception { - BlockingQueue receivedPayloads = new LinkedBlockingQueue<>(); - CountDownLatch ackLatch = new CountDownLatch(3); - AtomicBoolean failCalled = new AtomicBoolean(false); - - Mqtt5AsyncClient subscriber = MqttClient.builder() - .useMqttVersion5() - .serverHost(mqttContainer.getHost()) - .serverPort(mqttContainer.getMappedPort(MQTT_PORT)) - .identifier("mqtt-sink-e2e-subscriber") - .buildAsync(); - - subscriber.connectWith() - .cleanStart(true) - .send() - .get(10, TimeUnit.SECONDS); - subscriber.subscribeWith() - .topicFilter(TEST_TOPIC) - .callback(publish -> receivedPayloads.add( - new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8))) - .send() - .get(10, TimeUnit.SECONDS); + public void closeShouldBeSafeWhenSinkWasNeverOpened() { + new MqttSink().close(); + } + + private MqttSink newSinkWithOpenedClient(Mqtt5AsyncClient mqttClient) { + try { + @SuppressWarnings("unchecked") + Mqtt5ConnectBuilder.Send> connectBuilder = + mock(Mqtt5ConnectBuilder.Send.class, Mockito.RETURNS_SELF); + when(mqttClient.connectWith()).thenReturn(connectBuilder); + when(connectBuilder.send()) + .thenReturn(CompletableFuture.completedFuture(null)); + + MqttSink sink = Mockito.spy(new MqttSink()); + doReturn(mqttClient).when(sink).buildClient(any()); + sink.open(baseConfigMap(), mock(SinkContext.class)); + return sink; + } catch (Exception e) { + throw new AssertionError("Failed to initialize MqttSink test fixture", e); + } + } + private static Map baseConfigMap() { Map config = new HashMap<>(); - config.put("serverHost", mqttContainer.getHost()); - config.put("serverPort", mqttContainer.getMappedPort(MQTT_PORT)); - config.put("topic", TEST_TOPIC); + config.put("serverHost", "localhost"); + config.put("serverPort", 1883); + config.put("topic", "test/topic"); config.put("qos", 1); - config.put("connectionTimeoutMs", 10000); - config.put("clientId", "mqtt-sink-e2e-publisher"); - - SinkContext sinkContext = mock(SinkContext.class); - try (MqttSink sink = new MqttSink()) { - sink.open(config, sinkContext); - - for (int i = 0; i < 3; i++) { - sink.write(new TestRecord(("msg-" + i).getBytes(StandardCharsets.UTF_8), ackLatch, failCalled)); - } - - assertTrue(ackLatch.await(10, TimeUnit.SECONDS), "Timed out waiting for record.ack()"); - assertFalse(failCalled.get(), "record.fail() should not be called on successful publish"); - - assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-0"); - assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-1"); - assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-2"); - } finally { - subscriber.disconnectWith() - .sessionExpiryInterval(0) - .send() - .get(10, TimeUnit.SECONDS); - } + config.put("connectionTimeoutMs", 1000L); + config.put("keepAliveIntervalSec", 60); + config.put("cleanStart", true); + return config; } private static final class TestRecord implements Record { @@ -138,5 +121,9 @@ public void ack() { public void fail() { failCalled.set(true); } + + private boolean isFailed() { + return failCalled.get(); + } } } From 668eede72a194f5031d4279d076ac2c6f4763875 Mon Sep 17 00:00:00 2001 From: Dream95 Date: Sun, 10 May 2026 08:42:14 +0800 Subject: [PATCH 3/3] [fix][io] Handle disconnect interruption and correct serverPort FieldDoc Signed-off-by: Dream95 --- mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java | 3 +++ .../main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java index 50570979b7..cf7eee407a 100644 --- a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java +++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java @@ -126,6 +126,9 @@ public void close() { mqttClient.disconnectWith() .send() .get(mqttSinkConfig.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("MQTT disconnect was interrupted", e); } catch (Exception e) { log.warn("Failed to disconnect MQTT client cleanly", e); } diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java index 44a1bbc543..646462d0e4 100644 --- a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java +++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java @@ -45,7 +45,6 @@ public class MqttSinkConfig implements Serializable { private String serverHost; @FieldDoc( - required = true, defaultValue = "1883", help = "The MQTT broker port.") private int serverPort = 1883;