Skip to content

[Pipe] Deduplicate historical tsfile events in IoTConsensusV2 pipes#17472

Open
Pengzna wants to merge 1 commit intoapache:masterfrom
Pengzna:codex/iotv2-historical-tsfile-dedup
Open

[Pipe] Deduplicate historical tsfile events in IoTConsensusV2 pipes#17472
Pengzna wants to merge 1 commit intoapache:masterfrom
Pengzna:codex/iotv2-historical-tsfile-dedup

Conversation

@Pengzna
Copy link
Copy Markdown
Collaborator

@Pengzna Pengzna commented Apr 14, 2026

Why

In IoTConsensusV2 batch mode, the same logical tsfile can be observed by both the realtime source and the historical source in the same consensus pipe task. When that happens, the duplicated historical tsfile event may allocate another replicateIndex for an already captured file and cause redundant receiver-side seal/retry behavior.

What Changed

  • introduce a per-task-instance tsfile dedup scope id shared by the realtime and historical sources
  • let the realtime source register captured tsfile paths by task scope
  • let the historical source skip a tsfile event if the realtime source in the same task has already captured that file
  • clear the scoped progress index cache when the realtime source closes
  • add unit tests to verify the scoped keeper behavior

Verification

  • mvn -pl iotdb-core/datanode -DskipITs -Dtest=PipeTsFileEpochProgressIndexKeeperTest test
  • mvn clean package -DskipTests -T 1C
  • local 3C3D IoTDB repro with data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensusV2
  • workload: DEVICE_NUMBER=10, SENSOR_NUMBER=1, BATCH_SIZE_PER_WRITE=100000, LOOP=10
  • executed 3 rounds of iot-benchmark -> flush
  • observed logs such as skip historical tsfile ... because realtime source in current task ... has already captured it
  • no receiver-side writing file null is not available, Failed to seal file, or 2204 errors were observed in this repro

Copilot AI review requested due to automatic review settings April 14, 2026 07:30
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses duplicate tsfile transfer in IoTConsensusV2 “batch mode” pipes by introducing a per-task-instance dedup scope shared between the realtime and historical sources, allowing the historical source to skip tsfiles already captured by the realtime source.

Changes:

  • Add a per-task tsFileDedupScopeID and use it as the key for tsfile progress index tracking/dedup.
  • Register realtime-captured tsfiles into a scoped keeper and have the historical source skip duplicates under the same scope.
  • Add unit tests covering scoped behavior (contains/check/clear) of the keeper.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java Switch keeper keying from pipeName to task-scope ID; add scoped clear + contains APIs.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java Generate/store task-scope ID and clear scoped keeper state on close; use scope in progress-index checks.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java Register/eliminate tsfile progress index using the new task-scope ID.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java Register/eliminate tsfile progress index using the new task-scope ID.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java Compute matching task-scope ID and skip historical tsfile events already captured by realtime under that scope.
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java New unit tests verifying the keeper’s scope isolation and scoped clearing.
Comments suppressed due to low confidence (1)

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java:83

  • isProgressIndexAfterOrEquals name doesn’t match its predicate: !resource.getMaxProgressIndex().isAfter(progressIndex) is equivalent to maxProgressIndex <= progressIndex (i.e., before or equal), not after or equal. This makes the API very easy to misuse. Please either invert the comparison to match the name, or rename the method to reflect the current behavior (and adjust tests/callers accordingly).
  public synchronized boolean isProgressIndexAfterOrEquals(
      final int dataRegionId,
      final String taskScopeID,
      final String tsFilePath,
      final ProgressIndex progressIndex) {
    return progressIndexKeeper
        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
        .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>())
        .entrySet()
        .stream()
        .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
        .map(Entry::getValue)
        .filter(Objects::nonNull)
        .anyMatch(resource -> !resource.getMaxProgressIndex().isAfter(progressIndex));
  }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 46 to 52
public synchronized void eliminateProgressIndex(
final int dataRegionId, final @Nonnull String pipeName, final String filePath) {
final int dataRegionId, final @Nonnull String taskScopeID, final String filePath) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
.computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>())
.remove(filePath);
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PipeTsFileEpochProgressIndexKeeper changed the eliminateProgressIndex signature from pipeName to taskScopeID, but there are still call sites passing pipeName (e.g., in PipeTsFileInsertionEvent and PipeCompactedTsFileInsertionEvent). This will break compilation and also prevents removing the correct scoped entry. Please update all remaining callers to pass the same task-scope ID used during registerProgressIndex, or provide a compatible overload if some callers cannot access the scope ID.

Copilot uses AI. Check for mistakes.
Comment on lines +54 to +67
public synchronized void clearProgressIndex(
final int dataRegionId, final @Nonnull String taskScopeID) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.remove(taskScopeID);
}

public synchronized boolean containsTsFile(
final int dataRegionId, final @Nonnull String taskScopeID, final String tsFilePath) {
return progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>())
.containsKey(tsFilePath);
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clearProgressIndex/containsTsFile currently use computeIfAbsent, which creates new empty maps as a side effect (e.g., calling clearProgressIndex during close() when no tsfiles were ever registered will still allocate and retain an empty dataRegionId entry). Consider using get(...) with null checks instead, and optionally removing the dataRegionId entry when its scope map becomes empty, to avoid unbounded growth / unnecessary allocations.

Copilot uses AI. Check for mistakes.
Comment on lines +863 to +885
if (shouldSkipHistoricalTsFileEvent(resource)) {
filteredTsFileResources2TableNames.remove(resource);
LOGGER.info(
"Pipe {}@{}: skip historical tsfile {} because realtime source in current task {} has already captured it.",
pipeName,
dataRegionId,
resource.getTsFilePath(),
tsFileDedupScopeID);
try {
return null;
} finally {
try {
PipeDataNodeResourceManager.tsfile()
.unpinTsFileResource(resource, shouldTransferModFile, pipeName);
} catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to unpin skipped historical TsFileResource, original path: {}",
pipeName,
dataRegionId,
resource.getTsFilePath());
}
}
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a historical tsfile is skipped (shouldSkipHistoricalTsFileEvent), supplyTsFileEvent returns null. In the processor pipeline, a null supply result is treated as “no work” and causes the worker to sleep (see PipeProcessorSubtaskWorker.sleepIfNecessary), which can significantly slow historical consumption if there are multiple skipped duplicates. Consider continuing to poll the next PersistentResource within the same supply() call (or emitting an appropriate progress/report event) instead of returning null for a skipped resource.

Copilot uses AI. Check for mistakes.
Comment on lines +874 to +882
try {
PipeDataNodeResourceManager.tsfile()
.unpinTsFileResource(resource, shouldTransferModFile, pipeName);
} catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to unpin skipped historical TsFileResource, original path: {}",
pipeName,
dataRegionId,
resource.getTsFilePath());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May provide the exception.

Comment on lines +61 to +67
public synchronized boolean containsTsFile(
final int dataRegionId, final @Nonnull String taskScopeID, final String tsFilePath) {
return progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>())
.containsKey(tsFilePath);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to compute new entries when missing dataRegionId/taskScopeID, just returen false.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants