diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionMessageIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionMessageIT.java new file mode 100644 index 0000000000000..02aa773a8b735 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionMessageIT.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it.local; + +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeException; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; +import org.apache.iotdb.session.subscription.SubscriptionTreeSession; +import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer; +import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.List; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.locks.LockSupport; + +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class}) +public class IoTDBSubscriptionMessageIT extends AbstractSubscriptionLocalIT { + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + @Ignore + @Test + public void testPullConsumerCommitAfterRemoveUserData() throws Exception { + final String topicName = "topic_remove_user_data"; + insertHistoricalData(0, 100); + createTopic(topicName); + + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + try (final SubscriptionTreePullConsumer consumer = + new SubscriptionTreePullConsumer.Builder() + .host(host) + .port(port) + .consumerId("c_remove_user_data") + .consumerGroupId("cg_remove_user_data") + .autoCommit(false) + .buildPullConsumer()) { + consumer.open(); + consumer.subscribe(topicName); + + final List messages = pollMessages(consumer); + Assert.assertFalse(messages.isEmpty()); + + for (final SubscriptionMessage message : messages) { + Assert.assertNotNull(message.getCommitContext()); + Assert.assertFalse(message.getResultSets().isEmpty()); + + message.removeUserData(); + + Assert.assertNotNull(message.getCommitContext()); + Assert.assertThrows(SubscriptionRuntimeException.class, message::getResultSets); + Assert.assertThrows(SubscriptionRuntimeException.class, message::getRecordTabletIterator); + message.removeUserData(); + } + + consumer.commitSync(messages); + consumer.unsubscribe(topicName); + } + } + + @Ignore + @Test + public void testPullConsumerAutoCommitStoresCommitContextsOnly() throws Exception { + final String topicName = "topic_auto_commit_context_only"; + insertHistoricalData(100, 200); + createTopic(topicName); + + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + try (final SubscriptionTreePullConsumer consumer = + new SubscriptionTreePullConsumer.Builder() + .host(host) + .port(port) + .consumerId("c_auto_commit") + .consumerGroupId("cg_auto_commit") + .autoCommit(true) + .autoCommitIntervalMs(60_000L) + .buildPullConsumer()) { + consumer.open(); + consumer.subscribe(topicName); + + final List messages = pollMessages(consumer); + Assert.assertFalse(messages.isEmpty()); + messages.forEach(SubscriptionMessage::removeUserData); + + final SortedMap> uncommittedCommitContexts = + getUncommittedCommitContexts(consumer); + Assert.assertFalse(uncommittedCommitContexts.isEmpty()); + + final Object storedObject = + uncommittedCommitContexts.values().iterator().next().iterator().next(); + Assert.assertTrue(storedObject instanceof SubscriptionCommitContext); + Assert.assertFalse(storedObject instanceof SubscriptionMessage); + + consumer.unsubscribe(topicName); + } + } + + private void insertHistoricalData(final int start, final int end) { + try (final ISession session = EnvFactory.getEnv().getSessionConnection()) { + for (int i = start; i < end; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d1(time, s1) values (%s, 1)", i)); + } + session.executeNonQueryStatement("flush"); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private void createTopic(final String topicName) { + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + try (final SubscriptionTreeSession session = new SubscriptionTreeSession(host, port)) { + session.open(); + session.createTopic(topicName); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private List pollMessages(final SubscriptionTreePullConsumer consumer) + throws Exception { + for (int i = 0; i < 10; ++i) { + final List messages = + consumer.poll(Duration.ofMillis(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS)); + if (!messages.isEmpty()) { + return messages; + } + LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); + } + fail("Failed to poll subscription messages within the expected timeout."); + throw new IllegalStateException("unreachable"); + } + + @SuppressWarnings("unchecked") + private SortedMap> getUncommittedCommitContexts( + final SubscriptionTreePullConsumer consumer) throws Exception { + final Field field = + SubscriptionTreePullConsumer.class + .getSuperclass() + .getDeclaredField("uncommittedCommitContexts"); + field.setAccessible(true); + return (SortedMap>) field.get(consumer); + } +} diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java index c290af2b67b98..e9fbb1672e563 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java @@ -1145,22 +1145,40 @@ private List pollTabletsInternal( /////////////////////////////// commit sync (ack & nack) /////////////////////////////// protected void ack(final Iterable messages) throws SubscriptionException { + ackCommitContexts(extractCommitContexts(messages)); + } + + protected void ackCommitContexts(final Iterable commitContexts) + throws SubscriptionException { + commit(commitContexts, false); + } + + private Iterable extractCommitContexts( + final Iterable messages) { + final List commitContexts = new ArrayList<>(); + for (final SubscriptionMessage message : messages) { + commitContexts.add(message.getCommitContext()); + } + return commitContexts; + } + + private void commit(final Iterable commitContexts, final boolean nack) + throws SubscriptionException { final Map> dataNodeIdToSubscriptionCommitContexts = new HashMap<>(); - for (final SubscriptionMessage message : messages) { + for (final SubscriptionCommitContext commitContext : commitContexts) { dataNodeIdToSubscriptionCommitContexts - .computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>()) - .add(message.getCommitContext()); + .computeIfAbsent(commitContext.getDataNodeId(), (id) -> new ArrayList<>()) + .add(commitContext); } for (final Entry> entry : dataNodeIdToSubscriptionCommitContexts.entrySet()) { - commitInternal(entry.getKey(), entry.getValue(), false); + commitInternal(entry.getKey(), entry.getValue(), nack); } } protected void nack(final Iterable messages) throws SubscriptionException { - final Map> dataNodeIdToSubscriptionCommitContexts = - new HashMap<>(); + final List commitContexts = new ArrayList<>(); for (final SubscriptionMessage message : messages) { // make every effort to delete stale intermediate file if (Objects.equals(SubscriptionMessageType.TS_FILE.getType(), message.getMessageType()) @@ -1172,29 +1190,18 @@ protected void nack(final Iterable messages) throws Subscri } catch (final Exception ignored) { } } - dataNodeIdToSubscriptionCommitContexts - .computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>()) - .add(message.getCommitContext()); - } - for (final Entry> entry : - dataNodeIdToSubscriptionCommitContexts.entrySet()) { - commitInternal(entry.getKey(), entry.getValue(), true); + commitContexts.add(message.getCommitContext()); } + commit(commitContexts, true); } private void nack(final List responses) throws SubscriptionException { - final Map> dataNodeIdToSubscriptionCommitContexts = - new HashMap<>(); + final List commitContexts = new ArrayList<>(); for (final SubscriptionPollResponse response : responses) { // there is no stale intermediate file here - dataNodeIdToSubscriptionCommitContexts - .computeIfAbsent(response.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>()) - .add(response.getCommitContext()); - } - for (final Entry> entry : - dataNodeIdToSubscriptionCommitContexts.entrySet()) { - commitInternal(entry.getKey(), entry.getValue(), true); + commitContexts.add(response.getCommitContext()); } + commit(commitContexts, true); } private void commitInternal( diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java index 0c7478fa64dfb..991857bc685ee 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java @@ -21,6 +21,7 @@ import org.apache.iotdb.rpc.subscription.config.ConsumerConstant; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; import org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback; import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; import org.apache.iotdb.session.subscription.util.CollectionUtils; @@ -64,7 +65,7 @@ public abstract class AbstractSubscriptionPullConsumer extends AbstractSubscript private final boolean autoCommit; private final long autoCommitIntervalMs; - private SortedMap> uncommittedMessages; + private SortedMap> uncommittedCommitContexts; private final AtomicBoolean isClosed = new AtomicBoolean(true); @@ -123,7 +124,7 @@ protected synchronized void open() throws SubscriptionException { // submit auto poll worker if enabling auto commit if (autoCommit) { - uncommittedMessages = new ConcurrentSkipListMap<>(); + uncommittedCommitContexts = new ConcurrentSkipListMap<>(); submitAutoCommitWorker(); } } @@ -201,9 +202,12 @@ protected List poll(final Set topicNames, final lon if (currentTimestamp % autoCommitIntervalMs == 0) { index -= 1; } - uncommittedMessages + uncommittedCommitContexts .computeIfAbsent(index, o -> new ConcurrentSkipListSet<>()) - .addAll(messages); + .addAll( + messages.stream() + .map(SubscriptionMessage::getCommitContext) + .collect(Collectors.toList())); } return messages; @@ -271,11 +275,11 @@ public void run() { index -= 1; } - for (final Map.Entry> entry : - uncommittedMessages.headMap(index).entrySet()) { + for (final Map.Entry> entry : + uncommittedCommitContexts.headMap(index).entrySet()) { try { - ack(entry.getValue()); - uncommittedMessages.remove(entry.getKey()); + ackCommitContexts(entry.getValue()); + uncommittedCommitContexts.remove(entry.getKey()); } catch (final Exception e) { LOGGER.warn("something unexpected happened when auto commit messages...", e); } @@ -284,10 +288,11 @@ public void run() { } private void commitAllUncommittedMessages() { - for (final Map.Entry> entry : uncommittedMessages.entrySet()) { + for (final Map.Entry> entry : + uncommittedCommitContexts.entrySet()) { try { - ack(entry.getValue()); - uncommittedMessages.remove(entry.getKey()); + ackCommitContexts(entry.getValue()); + uncommittedCommitContexts.remove(entry.getKey()); } catch (final Exception e) { LOGGER.warn("something unexpected happened when commit messages during close", e); } @@ -314,7 +319,7 @@ protected Map allReportMessage() { allReportMessage.put("autoCommit", String.valueOf(autoCommit)); allReportMessage.put("autoCommitIntervalMs", String.valueOf(autoCommitIntervalMs)); if (autoCommit) { - allReportMessage.put("uncommittedMessages", uncommittedMessages.toString()); + allReportMessage.put("uncommittedCommitContexts", uncommittedCommitContexts.toString()); } return allReportMessage; } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java index f41d34f0ab83a..b4ea6f0166f1b 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java @@ -20,6 +20,7 @@ package org.apache.iotdb.session.subscription.payload; import org.apache.iotdb.rpc.subscription.exception.SubscriptionIncompatibleHandlerException; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeException; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; import org.apache.thrift.annotation.Nullable; @@ -39,6 +40,8 @@ public class SubscriptionMessage implements Comparable { private final SubscriptionMessageHandler handler; + private volatile boolean userDataRemoved = false; + public SubscriptionMessage( final SubscriptionCommitContext commitContext, final Map> tablets) { this.commitContext = commitContext; @@ -63,6 +66,17 @@ public short getMessageType() { return messageType; } + public void removeUserData() { + if (userDataRemoved) { + return; + } + + handler.removeUserData(); + if (handler instanceof SubscriptionRecordHandler) { + userDataRemoved = true; + } + } + /////////////////////////////// override /////////////////////////////// @Override @@ -101,6 +115,7 @@ public String toString() { /////////////////////////////// handlers /////////////////////////////// public List getResultSets() { + ensureUserDataAvailable(); if (handler instanceof SubscriptionRecordHandler) { return ((SubscriptionRecordHandler) handler).getResultSets(); } @@ -109,6 +124,7 @@ public List getResultSets() { } public Iterator getRecordTabletIterator() { + ensureUserDataAvailable(); if (handler instanceof SubscriptionRecordHandler) { final List resultSets = ((SubscriptionRecordHandler) handler).getResultSets(); return resultSets.stream() @@ -127,4 +143,11 @@ public SubscriptionTsFileHandler getTsFile() { throw new SubscriptionIncompatibleHandlerException( String.format("%s do not support getTsFile().", handler.getClass().getSimpleName())); } + + private void ensureUserDataAvailable() { + if (userDataRemoved) { + throw new SubscriptionRuntimeException( + String.format("User data has been removed from %s.", getClass().getSimpleName())); + } + } } diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageHandler.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageHandler.java index 275f89c0d14a6..c6d397322473a 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageHandler.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessageHandler.java @@ -19,4 +19,7 @@ package org.apache.iotdb.session.subscription.payload; -public interface SubscriptionMessageHandler {} +public interface SubscriptionMessageHandler { + + default void removeUserData() {} +} diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionRecordHandler.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionRecordHandler.java index c7f74db7695fd..69bd24d768711 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionRecordHandler.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionRecordHandler.java @@ -20,6 +20,7 @@ package org.apache.iotdb.session.subscription.payload; import org.apache.iotdb.rpc.subscription.annotation.TableModel; +import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeException; import org.apache.thrift.annotation.Nullable; import org.apache.tsfile.enums.TSDataType; @@ -49,10 +50,12 @@ public class SubscriptionRecordHandler implements Iterable, SubscriptionMessageHandler { - private final List resultSets; + private final List resultSets; + + private final List resultSetView; public SubscriptionRecordHandler(final Map> tablets) { - final List resultSets = new ArrayList<>(); + final List resultSets = new ArrayList<>(); for (final Map.Entry> entry : tablets.entrySet()) { final String databaseName = entry.getKey(); final List tabletList = entry.getValue(); @@ -67,15 +70,23 @@ public SubscriptionRecordHandler(final Map> tablets) { } } this.resultSets = Collections.unmodifiableList(resultSets); + final List resultSetView = new ArrayList<>(); + resultSetView.addAll(resultSets); + this.resultSetView = Collections.unmodifiableList(resultSetView); } public List getResultSets() { - return resultSets; + return resultSetView; } @Override public Iterator iterator() { - return resultSets.iterator(); + return resultSetView.iterator(); + } + + @Override + public void removeUserData() { + resultSets.forEach(SubscriptionResultSet::removeUserData); } public static class SubscriptionResultSet extends AbstractResultSet { @@ -90,6 +101,8 @@ public static class SubscriptionResultSet extends AbstractResultSet { @TableModel private List columnCategoryList; + private volatile boolean userDataRemoved = false; + private SubscriptionResultSet(final Tablet tablet, @Nullable final String databaseName) { super(generateColumnNames(tablet, databaseName), generateColumnTypes(tablet)); this.tablet = tablet; @@ -104,11 +117,13 @@ public String getDatabaseName() { @TableModel public String getTableName() { + ensureUserDataAvailable(); return tablet.getTableName(); } @TableModel public List getColumnCategories() { + ensureUserDataAvailable(); if (Objects.nonNull(columnCategoryList)) { return columnCategoryList; } @@ -139,10 +154,12 @@ public List getColumnCategories() { } public Tablet getTablet() { + ensureUserDataAvailable(); return tablet; } public boolean hasNext() { + ensureUserDataAvailable(); return Objects.nonNull(tablet) && rowIndex + 1 < sortedRowPositions.size(); } @@ -152,6 +169,7 @@ public RowRecord nextRecord() throws IOException { } public int getColumnCount() { + ensureUserDataAvailable(); return tablet.getSchemas().size() + 1; } @@ -175,6 +193,7 @@ public List getColumnTypes() { @Override public boolean next() throws IOException { + ensureUserDataAvailable(); if (Objects.isNull(tablet)) { return false; } @@ -197,6 +216,7 @@ public void close() { @Override public Iterator iterator() { + ensureUserDataAvailable(); final Tablet currentTablet = this.tablet; if (Objects.isNull(currentTablet)) { return Collections.emptyIterator(); @@ -220,6 +240,16 @@ public TSRecord next() { }; } + private void removeUserData() { + if (userDataRemoved) { + return; + } + + userDataRemoved = true; + sortedRowPositions.clear(); + close(); + } + public enum ColumnCategory { TIME, TAG, @@ -231,6 +261,13 @@ private boolean isTableData() { return Objects.nonNull(databaseName); } + private void ensureUserDataAvailable() { + if (userDataRemoved) { + throw new SubscriptionRuntimeException( + String.format("User data has been removed from %s.", getClass().getSimpleName())); + } + } + private static List generateColumnNames( final Tablet tablet, @Nullable final String databaseName) { final List schemas = tablet.getSchemas();