From e85851c6a16d5c25441e4b8198633a7fcaf8f6b8 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Sun, 8 Feb 2026 13:58:22 -0500 Subject: [PATCH 1/2] NIFI-15570: Keep track of Content Claims where the last Claim in a Resource Claim can be truncated if it is large. Whenever FlowFile Repository is checkpointed, truncate any large Resource Claims when possible and necessary to avoid having a situtation where a small FlowFile in a given Resource Claim prevents a large Content Claim from being cleaned up. --- .../repository/claim/ContentClaim.java | 8 + .../claim/ResourceClaimManager.java | 21 ++ .../repository/FileSystemRepository.java | 222 +++++++++++++- .../WriteAheadFlowFileRepository.java | 137 +++++++-- .../controller/TestFileSystemSwapManager.java | 9 + .../repository/TestFileSystemRepository.java | 280 ++++++++++++++++- .../TestWriteAheadFlowFileRepository.java | 285 ++++++++++++++++++ .../claim/StandardContentClaim.java | 10 + .../claim/StandardResourceClaimManager.java | 31 ++ .../TestStandardResourceClaimManager.java | 51 ++++ .../ByteArrayContentRepository.java | 5 + .../system/GenerateTruncatableFlowFiles.java | 114 +++++++ .../org.apache.nifi.processor.Processor | 1 + .../ContentClaimTruncationAfterRestartIT.java | 163 ++++++++++ .../ContentClaimTruncationIT.java | 153 ++++++++++ 15 files changed, 1443 insertions(+), 47 deletions(-) create mode 100644 nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java index 5c1d76bebbeb..54745f9d28fc 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java @@ -44,4 +44,12 @@ public interface ContentClaim extends Comparable { * @return the length of this ContentClaim */ long getLength(); + + /** + * Indicates whether or not this ContentClaim is a candidate for truncation. + * @return true if this ContentClaim is a candidate for truncation, false otherwise + */ + default boolean isTruncationCandidate() { + return false; + } } diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java index 4c68383d86be..4a54d371a1fc 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java @@ -112,6 +112,17 @@ public interface ResourceClaimManager { */ void markDestructable(ResourceClaim claim); + /** + * Indicates that the Resource Claim associated with the given Content Claim can now be + * truncated to the start of the ContentClaim. This should only ever be called after it is + * guaranteed that the FlowFile Repository has been synchronized with its underlying + * storage component for the same reason as described in the {@link #markDestructable(ResourceClaim)} + * method. + * + * @param claim the ContentClaim that should be used for truncation + */ + void markTruncatable(ContentClaim claim); + /** * Drains up to {@code maxElements} Content Claims from the internal queue * of destructable content claims to the given {@code destination} so that @@ -138,6 +149,16 @@ public interface ResourceClaimManager { */ void drainDestructableClaims(Collection destination, int maxElements, long timeout, TimeUnit unit); + /** + * Drains up to {@code maxElements} Content Claims from the internal queue + * of truncatable content claims to the given {@code destination} so that + * they can be truncated. + * + * @param destination to drain to + * @param maxElements max items to drain + */ + void drainTruncatableClaims(Collection destination, int maxElements); + /** * Clears the manager's memory of any and all ResourceClaims that it knows * about diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 15ec9e78cddf..dcef02615f5d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -47,8 +47,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.channels.FileChannel; import java.nio.file.FileVisitResult; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardOpenOption; @@ -99,9 +101,10 @@ public class FileSystemRepository implements ContentRepository { private final List containerNames; private final AtomicLong index; - private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true); + private final ScheduledExecutorService executor = new FlowEngine(6, "FileSystemRepository Workers", true); private final ConcurrentMap> reclaimable = new ConcurrentHashMap<>(); private final Map containerStateMap = new HashMap<>(); + private final TruncationClaimManager truncationClaimManager = new TruncationClaimManager(); // Queue for claims that are kept open for writing. Ideally, this will be at // least as large as the number of threads that will be updating the repository simultaneously but we don't want @@ -170,12 +173,13 @@ public FileSystemRepository(final NiFiProperties nifiProperties) throws IOExcept archiveData = true; if (maxArchiveSize == null) { - throw new RuntimeException("No value specified for property '" - + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving."); + throw new RuntimeException("No value specified for property '%s' but archiving is enabled. You must configure the max disk usage in order to enable archiving.".formatted( + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE)); } if (!MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) { - throw new RuntimeException("Invalid value specified for the '" + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' property. Value must be in format: %"); + throw new RuntimeException("Invalid value specified for the '%s' property. Value must be in format: %%".formatted( + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE)); } } else if ("false".equalsIgnoreCase(enableArchiving)) { archiveData = false; @@ -238,14 +242,15 @@ public void initialize(final ContentRepositoryContext context) { this.resourceClaimManager = context.getResourceClaimManager(); this.eventReporter = context.getEventReporter(); - final Map fileRespositoryPaths = nifiProperties.getContentRepositoryPaths(); + final Map fileRepositoryPaths = nifiProperties.getContentRepositoryPaths(); executor.scheduleWithFixedDelay(new BinDestructableClaims(), 1, 1, TimeUnit.SECONDS); - for (int i = 0; i < fileRespositoryPaths.size(); i++) { + for (int i = 0; i < fileRepositoryPaths.size(); i++) { executor.scheduleWithFixedDelay(new ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS); } final long cleanupMillis = this.determineCleanupInterval(nifiProperties); + executor.scheduleWithFixedDelay(new TruncateClaims(), cleanupMillis, cleanupMillis, TimeUnit.MILLISECONDS); for (final Map.Entry containerEntry : containers.entrySet()) { final String containerName = containerEntry.getKey(); @@ -689,7 +694,16 @@ public ContentClaim create(final boolean lossTolerant) throws IOException { @Override public int incrementClaimaintCount(final ContentClaim claim) { - return incrementClaimantCount(claim == null ? null : claim.getResourceClaim(), false); + if (claim == null) { + return 0; + } + + if (claim.isTruncationCandidate() && claim instanceof final StandardContentClaim scc) { + LOG.debug("{} is a truncation candidate, but is being claimed again. Setting truncation candidate to false.", claim); + scc.setTruncationCandidate(false); + } + + return incrementClaimantCount(claim.getResourceClaim(), false); } protected int incrementClaimantCount(final ResourceClaim resourceClaim, final boolean newClaim) { @@ -741,6 +755,7 @@ private boolean remove(final ResourceClaim claim) { } } + truncationClaimManager.removeTruncationClaims(claim); return true; } @@ -1032,6 +1047,119 @@ public void purge() { resourceClaimManager.purge(); } + private class TruncateClaims implements Runnable { + + @Override + public void run() { + final Map truncationActivationCache = new HashMap<>(); + + // Go through any known truncation claims and truncate them now if truncation is enabled for their container. + for (final String container : containerNames) { + if (isTruncationActiveForContainer(container, truncationActivationCache)) { + final List toTruncate = truncationClaimManager.removeTruncationClaims(container); + if (toTruncate.isEmpty()) { + continue; + } + + truncateClaims(toTruncate, truncationActivationCache); + } + } + + // Drain any Truncation Claims from the Resource Claim Manager. + // If able, truncate those claims. Otherwise, save those claims in the Truncation Claim Manager to be truncated on the next run. + // This prevents us from having a case where we could truncate a big claim but we don't because we're not yet running out of disk space, + // but then we later start to run out of disk space and lost the opportunity to truncate that big claim. + while (true) { + final List toTruncate = new ArrayList<>(); + resourceClaimManager.drainTruncatableClaims(toTruncate, 10_000); + if (toTruncate.isEmpty()) { + return; + } + + truncateClaims(toTruncate, truncationActivationCache); + } + } + + private void truncateClaims(final List toTruncate, final Map truncationActivationCache) { + final Map> claimsSkipped = new HashMap<>(); + + for (final ContentClaim claim : toTruncate) { + final String container = claim.getResourceClaim().getContainer(); + if (!isTruncationActiveForContainer(container, truncationActivationCache)) { + LOG.debug("Will not truncate {} because truncation is not active for container {}; will save for later truncation.", claim, container); + claimsSkipped.computeIfAbsent(container, key -> new ArrayList<>()).add(claim); + continue; + } + + if (claim.isTruncationCandidate()) { + truncate(claim); + } + } + + claimsSkipped.forEach(truncationClaimManager::addTruncationClaims); + } + + private boolean isTruncationActiveForContainer(final String container, final Map activationCache) { + // If not archiving data, we consider truncation always active. + if (!archiveData) { + return true; + } + + final Boolean cachedValue = activationCache.get(container); + if (cachedValue != null) { + return cachedValue; + } + + if (!isArchiveClearedOnLastRun(container)) { + LOG.debug("Truncation is not active for container {} because the archive was not cleared on the last run.", container); + activationCache.put(container, false); + return false; + } + + final long usableSpace; + try { + usableSpace = getContainerUsableSpace(container); + } catch (final IOException ioe) { + LOG.warn("Failed to determine usable space for container {}. Will not truncate claims for this container.", container, ioe); + return false; + } + + final Long minUsableSpace = minUsableContainerBytesForArchive.get(container); + if (minUsableSpace != null && usableSpace < minUsableSpace) { + LOG.debug("Truncate is active for Container {} because usable space of {} bytes is below the desired threshold of {} bytes.", + container, usableSpace, minUsableSpace); + + activationCache.put(container, true); + return true; + } + + activationCache.put(container, false); + return false; + } + + private void truncate(final ContentClaim claim) { + LOG.info("Truncating {} to {} bytes because the FlowFile occupying the last {} bytes has been removed", + claim.getResourceClaim(), claim.getOffset(), claim.getLength()); + + final Path path = getPath(claim); + if (path == null) { + LOG.warn("Cannot truncate {} because the file cannot be found", claim); + return; + } + + try (final FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.WRITE)) { + fileChannel.truncate(claim.getOffset()); + } catch (final NoSuchFileException nsfe) { + // This is unlikely but can occur if the claim was truncatable and the underlying Resource Claim becomes + // destructable. In this case, we may archive or delete the entire ResourceClaim. This is safe to ignore, + // since it means the data is cleaned up anyway. + LOG.debug("Failed to truncate {} because file does not exist.", claim, nsfe); + } catch (final IOException e) { + LOG.warn("Failed to truncate {} to {} bytes", claim, claim.getOffset(), e); + } + } + } + private class BinDestructableClaims implements Runnable { @Override @@ -1120,6 +1248,11 @@ boolean archive(final ResourceClaim claim) throws IOException { final boolean archived = archive(curPath); LOG.debug("Successfully moved {} to archive", claim); + + if (archived) { + truncationClaimManager.removeTruncationClaims(claim); + } + return archived; } @@ -1392,7 +1525,7 @@ public FileVisitResult visitFile(final Path file, final BasicFileAttributes attr if (notYetExceedingThreshold.isEmpty()) { oldestContainerArchive = System.currentTimeMillis(); } else { - oldestContainerArchive = notYetExceedingThreshold.get(0).getLastModTime(); + oldestContainerArchive = notYetExceedingThreshold.getFirst().getLastModTime(); } // Queue up the files in the order that they should be destroyed so that we don't have to scan the directories for a while. @@ -1400,10 +1533,11 @@ public FileVisitResult visitFile(final Path file, final BasicFileAttributes attr fileQueue.offer(toEnqueue); } + containerState.setArchiveClearedOnLastRun(notYetExceedingThreshold.isEmpty()); + final long cleanupMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - deleteOldestMillis - sortRemainingMillis - deleteExpiredMillis; LOG.debug("Oldest Archive Date for Container {} is {}; delete expired = {} ms, sort remaining = {} ms, delete oldest = {} ms, cleanup = {} ms", containerName, new Date(oldestContainerArchive), deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis); - return; } private class ArchiveOrDestroyDestructableClaims implements Runnable { @@ -1543,6 +1677,7 @@ private class ContainerState { private volatile long bytesUsed = 0L; private volatile long checkUsedCutoffTimestamp = 0L; + private volatile boolean archiveClearedOnLastRun = false; public ContainerState(final String containerName, final boolean archiveEnabled, final long backPressureBytes, final long capacity) { this.containerName = containerName; @@ -1661,6 +1796,24 @@ public long getArchiveCount() { public void decrementArchiveCount() { archivedFileCount.decrementAndGet(); } + + public void setArchiveClearedOnLastRun(final boolean archiveClearedOnLastRun) { + this.archiveClearedOnLastRun = archiveClearedOnLastRun; + } + + public boolean isArchiveClearedOnLastRun() { + return archiveClearedOnLastRun; + } + } + + // Visible for testing + protected boolean isArchiveClearedOnLastRun(final String containerName) { + final ContainerState containerState = containerStateMap.get(containerName); + if (containerState == null) { + return false; + } + + return containerState.isArchiveClearedOnLastRun(); } protected static class ClaimLengthPair { @@ -1882,19 +2035,27 @@ public synchronized void close() throws IOException { // Mark the claim as no longer being able to be written to resourceClaimManager.freeze(scc.getResourceClaim()); + // If the content claim length is large (> 1 MB or the max appendable claim length), + // mark the claim as a truncation candidate + final boolean largeClaim = scc.getLength() > Math.min(1_000_000, maxAppendableClaimLength); + final boolean nonStartClaim = scc.getOffset() > 0; + if (largeClaim && nonStartClaim) { + scc.setTruncationCandidate(true); + } + // ensure that the claim is no longer on the queue writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength)); - bcos.close(); - LOG.debug("Claim lenth >= max; Closing {}", this); + LOG.debug("Claim length >= max; Closing {}", this); if (LOG.isTraceEnabled()) { LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this)); } + bcos.close(); } } @Override - public synchronized ContentClaim newContentClaim() throws IOException { + public synchronized ContentClaim newContentClaim() { scc = new StandardContentClaim(scc.getResourceClaim(), scc.getOffset() + Math.max(0, scc.getLength())); initialLength = 0; bytesWritten = 0L; @@ -1903,4 +2064,41 @@ public synchronized ContentClaim newContentClaim() throws IOException { } } + private static class TruncationClaimManager { + private static final int MAX_THRESHOLD = 100_000; + private final Map> truncationClaims = new HashMap<>(); + + public synchronized void addTruncationClaims(final String container, final List claim) { + final List contentClaims = truncationClaims.computeIfAbsent(container, c -> new ArrayList<>()); + contentClaims.addAll(claim); + + // If we have too many claims, remove the smallest ones so that we only have the largest MAX_THRESHOLD claims. + if (contentClaims.size() > MAX_THRESHOLD) { + contentClaims.sort(Comparator.comparingLong(ContentClaim::getLength).reversed()); + final List discardableClaims = contentClaims.subList(MAX_THRESHOLD, contentClaims.size()); + LOG.debug("Truncation Claim Manager has more than {} claims for container {}; discarding {} claims: {}", + MAX_THRESHOLD, container, discardableClaims.size(), discardableClaims); + discardableClaims.clear(); + } + } + + public synchronized List removeTruncationClaims(final String container) { + final List removed = truncationClaims.remove(container); + return removed == null ? Collections.emptyList() : removed; + } + + public synchronized List removeTruncationClaims(final ResourceClaim resourceClaim) { + final List contentClaims = truncationClaims.get(resourceClaim.getContainer()); + if (contentClaims == null) { + return Collections.emptyList(); + } + + final List claimsToRemove = contentClaims.stream() + .filter(cc -> cc.getResourceClaim().equals(resourceClaim)) + .toList(); + + contentClaims.removeAll(claimsToRemove); + return claimsToRemove; + } + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index a04b7527917d..963f02e028a7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -21,7 +21,9 @@ import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.repository.schema.FieldCache; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; @@ -98,6 +100,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private final List flowFileRepositoryPaths = new ArrayList<>(); private final ScheduledExecutorService checkpointExecutor; private final int maxCharactersToCache; + private final long truncationThreshold; private volatile Collection recoveredRecords = null; private final Set orphanedResourceClaims = Collections.synchronizedSet(new HashSet<>()); @@ -132,6 +135,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // before the data is destroyed, it's okay because the data will be unknown to the Content Repository, so it will be destroyed // on restart. private final ConcurrentMap> claimsAwaitingDestruction = new ConcurrentHashMap<>(); + private final ConcurrentMap> claimsAwaitingTruncation = new ConcurrentHashMap<>(); /** * default no args constructor for service loading only. @@ -143,6 +147,7 @@ public WriteAheadFlowFileRepository() { nifiProperties = null; retainOrphanedFlowFiles = true; maxCharactersToCache = 0; + truncationThreshold = Long.MAX_VALUE; } public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) { @@ -153,6 +158,8 @@ public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) { retainOrphanedFlowFiles = orphanedFlowFileProperty == null || Boolean.parseBoolean(orphanedFlowFileProperty); this.maxCharactersToCache = nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE); + final long maxAppendableClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); + truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength); final String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX); flowFileRepositoryPaths.add(new File(directoryName)); @@ -445,12 +452,13 @@ protected void updateContentClaims(Collection repositoryRecord // The below code is not entirely thread-safe, but we are OK with that because the results aren't really harmful. // Specifically, if two different threads call updateRepository with DELETE records for the same Content Claim, // it's quite possible for claimant count to be 0 below, which results in two different threads adding the Content - // Claim to the 'claimsAwaitDestruction' map. As a result, we can call #markDestructable with the same ContentClaim + // Claim to the 'claimsAwaitingDestruction' map. As a result, we can call #markDestructable with the same ContentClaim // multiple times, and the #markDestructable method is not necessarily idempotent. // However, the result of this is that the FileSystem Repository may end up trying to remove the content multiple times. // This does not, however, cause problems, as ContentRepository should handle this // This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface. - final Set claimsToAdd = new HashSet<>(); + final Set destructableClaims = new HashSet<>(); + final Set truncatableClaims = new HashSet<>(); final Set swapLocationsAdded = new HashSet<>(); final Set swapLocationsRemoved = new HashSet<>(); @@ -458,20 +466,34 @@ protected void updateContentClaims(Collection repositoryRecord for (final RepositoryRecord record : repositoryRecords) { updateClaimCounts(record); + final ContentClaim contentClaim = record.getCurrentClaim(); + final boolean truncationCandidate = contentClaim != null && contentClaim.isTruncationCandidate(); + final boolean claimChanged = !Objects.equals(record.getOriginalClaim(), contentClaim); if (record.getType() == RepositoryRecordType.DELETE) { - // For any DELETE record that we have, if claim is destructible, mark it so - if (record.getCurrentClaim() != null && isDestructable(record.getCurrentClaim())) { - claimsToAdd.add(record.getCurrentClaim().getResourceClaim()); + // For any DELETE record that we have, if claim is destructible or truncatable, mark it so + if (isDestructable(contentClaim)) { + destructableClaims.add(contentClaim.getResourceClaim()); + } else if (truncationCandidate) { + truncatableClaims.add(contentClaim); } - // If the original claim is different than the current claim and the original claim is destructible, mark it so - if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && isDestructable(record.getOriginalClaim())) { - claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); + // If the original claim is different than the current claim and the original claim is destructible + // or truncatable, mark it so + if (claimChanged) { + if (isDestructable(record.getOriginalClaim())) { + destructableClaims.add(record.getOriginalClaim().getResourceClaim()); + } else if (record.getOriginalClaim() != null && record.getOriginalClaim().isTruncationCandidate()) { + truncatableClaims.add(record.getOriginalClaim()); + } } } else if (record.getType() == RepositoryRecordType.UPDATE) { // if we have an update, and the original is no longer needed, mark original as destructible - if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && isDestructable(record.getOriginalClaim())) { - claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); + if (claimChanged) { + if (isDestructable(record.getOriginalClaim())) { + destructableClaims.add(record.getOriginalClaim().getResourceClaim()); + } else if (record.getOriginalClaim() != null && record.getOriginalClaim().isTruncationCandidate()) { + truncatableClaims.add(record.getOriginalClaim()); + } } } else if (record.getType() == RepositoryRecordType.SWAP_OUT) { final String swapLocation = record.getSwapLocation(); @@ -484,13 +506,16 @@ protected void updateContentClaims(Collection repositoryRecord } } - // Once the content claim counts have been updated for all records, collect any transient claims that are eligible for destruction + // Once the content claim counts have been updated for all records, collect any transient + // claims that are eligible for destruction or truncation for (final RepositoryRecord record : repositoryRecords) { final List transientClaims = record.getTransientClaims(); if (transientClaims != null) { for (final ContentClaim transientClaim : transientClaims) { if (isDestructable(transientClaim)) { - claimsToAdd.add(transientClaim.getResourceClaim()); + destructableClaims.add(transientClaim.getResourceClaim()); + } else if (transientClaim.isTruncationCandidate()) { + truncatableClaims.add(transientClaim); } } } @@ -504,19 +529,15 @@ protected void updateContentClaims(Collection repositoryRecord } } - if (!claimsToAdd.isEmpty()) { - // Get / Register a Set for the given Partition Index - final Integer partitionKey = Integer.valueOf(partitionIndex); - BlockingQueue claimQueue = claimsAwaitingDestruction.get(partitionKey); - if (claimQueue == null) { - claimQueue = new LinkedBlockingQueue<>(); - final BlockingQueue existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue); - if (existingClaimQueue != null) { - claimQueue = existingClaimQueue; - } - } + if (!destructableClaims.isEmpty()) { + // Get / Register a Set for the given Partition Index + final BlockingQueue claimQueue = claimsAwaitingDestruction.computeIfAbsent(partitionIndex, key -> new LinkedBlockingQueue<>()); + claimQueue.addAll(destructableClaims); + } - claimQueue.addAll(claimsToAdd); + if (!truncatableClaims.isEmpty()) { + final BlockingQueue claimQueue = claimsAwaitingTruncation.computeIfAbsent(partitionIndex, key -> new LinkedBlockingQueue<>()); + claimQueue.addAll(truncatableClaims); } } @@ -566,16 +587,24 @@ private static String getLocationSuffix(final String swapLocation) { @Override public void onSync(final int partitionIndex) { - final BlockingQueue claimQueue = claimsAwaitingDestruction.get(partitionIndex); - if (claimQueue == null) { - return; + final BlockingQueue destructionClaimQueue = claimsAwaitingDestruction.get(partitionIndex); + if (destructionClaimQueue != null) { + final Set claimsToDestroy = new HashSet<>(); + destructionClaimQueue.drainTo(claimsToDestroy); + + for (final ResourceClaim claim : claimsToDestroy) { + markDestructable(claim); + } } - final Set claimsToDestroy = new HashSet<>(); - claimQueue.drainTo(claimsToDestroy); + final BlockingQueue truncationClaimQueue = claimsAwaitingTruncation.get(partitionIndex); + if (truncationClaimQueue != null) { + final Set claimsToTruncate = new HashSet<>(); + truncationClaimQueue.drainTo(claimsToTruncate); - for (final ResourceClaim claim : claimsToDestroy) { - markDestructable(claim); + for (final ContentClaim claim : claimsToTruncate) { + claimManager.markTruncatable(claim); + } } } @@ -589,6 +618,15 @@ public void onGlobalSync() { markDestructable(claim); } } + + for (final BlockingQueue claimQueue : claimsAwaitingTruncation.values()) { + final Set claimsToTruncate = new HashSet<>(); + claimQueue.drainTo(claimsToTruncate); + + for (final ContentClaim claim : claimsToTruncate) { + claimManager.markTruncatable(claim); + } + } } /** @@ -723,6 +761,10 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException queueMap.put(queue.getIdentifier(), queue); } + final Set truncationEligibleClaims = new HashSet<>(); + final Set forbiddenTruncationClaims = new HashSet<>(); + final Map latestContentClaimByResourceClaim = new HashMap<>(); + final List dropRecords = new ArrayList<>(); int numFlowFilesMissingQueue = 0; long maxId = 0; @@ -748,6 +790,15 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException } final ContentClaim claim = record.getContentClaim(); + + // Track the latest Content Claim for each Resource Claim so that we can determine which claims are eligible for truncation. + if (claim != null) { + final ContentClaim latestContentClaim = latestContentClaimByResourceClaim.get(claim.getResourceClaim()); + if (latestContentClaim == null || claim.getOffset() > latestContentClaim.getOffset()) { + latestContentClaimByResourceClaim.put(claim.getResourceClaim(), claim); + } + } + final FlowFileQueue flowFileQueue = queueMap.get(queueId); final boolean orphaned = flowFileQueue == null; if (orphaned) { @@ -777,6 +828,18 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException continue; } else if (claim != null) { + // If the claim exceeds the max appendable claim length on its own and doesn't start the Resource Claim, + // we will consider it to be eligible for truncation. However, if there are multiple FlowFiles sharing the + // same claim, we cannot truncate it because doing so would affect the other FlowFiles. + if (claim.getOffset() > 0 && claim.getLength() > truncationThreshold && claim instanceof final StandardContentClaim scc) { + if (forbiddenTruncationClaims.contains(claim) || truncationEligibleClaims.contains(scc)) { + truncationEligibleClaims.remove(scc); + forbiddenTruncationClaims.add(scc); + } else { + truncationEligibleClaims.add(scc); + } + } + claimManager.incrementClaimantCount(claim.getResourceClaim()); } @@ -786,6 +849,16 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException // If recoveredRecords has been populated it need to be nulled out now because it is no longer useful and can be garbage collected. recoveredRecords = null; + // If any Content Claim was determined to be truncatable, mark it as such now. + for (final StandardContentClaim eligible : truncationEligibleClaims) { + final ContentClaim latestForResource = latestContentClaimByResourceClaim.get(eligible.getResourceClaim()); + if (!Objects.equals(eligible, latestForResource)) { + continue; + } + + eligible.setTruncationCandidate(true); + } + // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will // return the appropriate number. flowFileSequenceGenerator.set(maxId + 1); @@ -852,7 +925,7 @@ public long getNextFlowFileSequence() { } @Override - public long getMaxFlowFileIdentifier() throws IOException { + public long getMaxFlowFileIdentifier() { // flowFileSequenceGenerator is 1 more than the MAX so that we can call #getAndIncrement on the AtomicLong return flowFileSequenceGenerator.get() - 1; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index 9e0a1324a48f..5a6b9d89bf61 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -21,6 +21,7 @@ import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.SwapContents; import org.apache.nifi.controller.repository.SwapManagerInitializationContext; +import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.events.EventReporter; @@ -223,6 +224,10 @@ public int incrementClaimantCount(ResourceClaim claim, boolean newClaim) { public void markDestructable(ResourceClaim claim) { } + @Override + public void markTruncatable(final ContentClaim claim) { + } + @Override public void drainDestructableClaims(Collection destination, int maxElements) { } @@ -231,6 +236,10 @@ public void drainDestructableClaims(Collection destination, int m public void drainDestructableClaims(Collection destination, int maxElements, long timeout, TimeUnit unit) { } + @Override + public void drainTruncatableClaims(final Collection destination, final int maxElements) { + } + @Override public void purge() { } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index 7f0e2a7a9d73..206ff60d3349 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -59,6 +59,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -87,7 +88,10 @@ public void setup() throws IOException { originalNifiPropertiesFile = Paths.get("src/test/resources/conf/nifi.properties"); rootFile = tempDir.resolve("content_repository"); final String contentRepositoryDirectory = NiFiProperties.REPOSITORY_CONTENT_PREFIX.concat("default"); - final Map additionalProperties = Map.of(contentRepositoryDirectory, rootFile.toString()); + final Map additionalProperties = Map.of( + contentRepositoryDirectory, rootFile.toString(), + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 sec" + ); nifiProperties = NiFiProperties.createBasicNiFiProperties(originalNifiPropertiesFile.toString(), additionalProperties); repository = new FileSystemRepository(nifiProperties); claimManager = new StandardResourceClaimManager(); @@ -502,10 +506,14 @@ public void testRemoveDeletesFileIfNoClaimants() throws IOException { } private Path getPath(final ContentClaim claim) { + return getPath(repository, claim); + } + + private Path getPath(final FileSystemRepository repo, final ContentClaim claim) { try { - final Method m = repository.getClass().getDeclaredMethod("getPath", ContentClaim.class); + final Method m = FileSystemRepository.class.getDeclaredMethod("getPath", ContentClaim.class); m.setAccessible(true); - return (Path) m.invoke(repository, claim); + return (Path) m.invoke(repo, claim); } catch (final Exception e) { throw new RuntimeException("Could not invoke #getPath on FileSystemRepository due to " + e); } @@ -897,6 +905,272 @@ protected boolean archive(Path curPath) { } } + @Test + public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() throws IOException { + final long maxClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); + + // Create a small claim C1 at offset 0. Write less data than maxAppendableClaimLength so the ResourceClaim + // is recycled back to the writable queue. + final ContentClaim c1 = repository.create(false); + final byte[] smallData = new byte[100]; + try (final OutputStream out = repository.write(c1)) { + out.write(smallData); + } + // C1 should NOT be a truncation candidate (it's small) + assertFalse(c1.isTruncationCandidate(), "Small claim at offset 0 should not be a truncation candidate"); + + // Now create C2 on potentially the same ResourceClaim, writing more than maxAppendableClaimLength to freeze + // the ResourceClaim. Because c1 was small and recycled, c2 will be at a non-zero offset on the same ResourceClaim. + final ContentClaim c2 = repository.create(false); + final byte[] largeData = new byte[(int) maxClaimLength + 1024]; + try (final OutputStream out = repository.write(c2)) { + out.write(largeData); + } + // C2 should be a truncation candidate: large and at a non-zero offset + assertTrue(c2.isTruncationCandidate(), "Large claim at non-zero offset should be a truncation candidate"); + + // Negative case: create a standalone large claim at offset 0 (fresh ResourceClaim) + // To ensure a fresh ResourceClaim, write large data to all writable claims to exhaust them, + // then create a new claim that starts at offset 0. + // The simplest approach: create claims until we get one at offset 0. + ContentClaim offsetZeroClaim = null; + for (int i = 0; i < 20; i++) { + final ContentClaim candidate = repository.create(false); + if (candidate instanceof StandardContentClaim scc && scc.getOffset() == 0) { + // Write large data that exceeds maxAppendableClaimLength + try (final OutputStream out = repository.write(candidate)) { + out.write(new byte[(int) maxClaimLength + 1024]); + } + offsetZeroClaim = candidate; + break; + } else { + // Write large data to exhaust this claim's ResourceClaim + try (final OutputStream out = repository.write(candidate)) { + out.write(new byte[(int) maxClaimLength + 1024]); + } + } + } + + assertNotNull(offsetZeroClaim, "Should have found a claim at offset 0"); + assertFalse(offsetZeroClaim.isTruncationCandidate(), "Large claim at offset 0 should NOT be a truncation candidate"); + } + + @Test + public void testIncrementClaimantCountClearsTruncationCandidate() throws IOException { + final long maxClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); + + // Create a small claim to start a ResourceClaim, then a large claim to freeze it + final ContentClaim c1 = repository.create(false); + try (final OutputStream out = repository.write(c1)) { + out.write(new byte[100]); + } + + final ContentClaim c2 = repository.create(false); + try (final OutputStream out = repository.write(c2)) { + out.write(new byte[(int) maxClaimLength + 1024]); + } + + // c2 should be a truncation candidate + assertTrue(c2.isTruncationCandidate(), "Claim should be a truncation candidate before incrementClaimaintCount"); + + // Simulate a clone by incrementing claimant count + repository.incrementClaimaintCount(c2); + + // After incrementing, it should no longer be a truncation candidate + assertFalse(c2.isTruncationCandidate(), "Claim should NOT be a truncation candidate after incrementClaimaintCount"); + } + + @Test + @Timeout(60) + public void testTruncateClaimReducesFileSizeAndPreservesEarlierData() throws IOException, InterruptedException { + // We need to create our own repository that overrides getContainerUsableSpace to simulate disk pressure + shutdown(); + + final FileSystemRepository localRepo = new FileSystemRepository(nifiProperties) { + @Override + public long getContainerUsableSpace(final String containerName) { + return 0; // Extreme disk pressure + } + + @Override + protected boolean isArchiveClearedOnLastRun(final String containerName) { + return true; + } + }; + + try { + final StandardResourceClaimManager localClaimManager = new StandardResourceClaimManager(); + localRepo.initialize(new StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP)); + localRepo.purge(); + + final long maxClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); + + // Create C1 (small) then C2 (large) on the same ResourceClaim + final ContentClaim c1 = localRepo.create(false); + final byte[] smallData = "Hello World - small claim data".getBytes(StandardCharsets.UTF_8); + try (final OutputStream out = localRepo.write(c1)) { + out.write(smallData); + } + + final ContentClaim c2 = localRepo.create(false); + final byte[] largeData = new byte[(int) maxClaimLength + 4096]; + new Random().nextBytes(largeData); + try (final OutputStream out = localRepo.write(c2)) { + out.write(largeData); + } + + assertTrue(c2.isTruncationCandidate(), "C2 should be a truncation candidate"); + + // Both claims should share the same resource claim + assertEquals(c1.getResourceClaim(), c2.getResourceClaim(), "Both claims should share the same ResourceClaim"); + + // Get the file path + final Path filePath = getPath(localRepo, c1); + assertNotNull(filePath); + final long originalSize = Files.size(filePath); + assertTrue(originalSize > maxClaimLength, "File should be larger than maxAppendableClaimLength"); + + // Decrement claimant count for C2 to 0 (C1 still holds a reference) + localClaimManager.decrementClaimantCount(c2.getResourceClaim()); + + // Mark C2 as truncatable + localClaimManager.markTruncatable(c2); + + // Wait for the TruncateClaims background task to truncate the file. Poll the file size until it shrinks. + final long expectedTruncatedSize = c2.getOffset(); + while (Files.size(filePath) != expectedTruncatedSize) { + Thread.sleep(100L); + } + + // Verify C1's data is still fully readable + try (final InputStream in = localRepo.read(c1)) { + final byte[] readData = readFully(in, smallData.length); + assertArrayEquals(smallData, readData, "C1's data should still be fully readable after truncation"); + } + } finally { + localRepo.shutdown(); + } + } + + @Test + @Timeout(60) + public void testTruncateNotActiveWhenDiskNotPressured() throws IOException, InterruptedException { + // Create repository with ample disk space + shutdown(); + + final FileSystemRepository localRepo = new FileSystemRepository(nifiProperties) { + @Override + public long getContainerUsableSpace(final String containerName) { + return Long.MAX_VALUE; // Plenty of space + } + + @Override + protected boolean isArchiveClearedOnLastRun(final String containerName) { + return true; + } + }; + + try { + final StandardResourceClaimManager localClaimManager = new StandardResourceClaimManager(); + localRepo.initialize(new StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP)); + localRepo.purge(); + + final long maxClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); + + final ContentClaim c1 = localRepo.create(false); + try (final OutputStream out = localRepo.write(c1)) { + out.write(new byte[100]); + } + + final ContentClaim c2 = localRepo.create(false); + try (final OutputStream out = localRepo.write(c2)) { + out.write(new byte[(int) maxClaimLength + 4096]); + } + + assertTrue(c2.isTruncationCandidate()); + + final Path filePath = getPath(localRepo, c1); + final long originalSize = Files.size(filePath); + + localClaimManager.decrementClaimantCount(c2.getResourceClaim()); + localClaimManager.markTruncatable(c2); + + Thread.sleep(3000L); + assertEquals(originalSize, Files.size(filePath), "File should NOT have been truncated when disk space is ample"); + } finally { + localRepo.shutdown(); + } + } + + @Test + @Timeout(90) + public void testTruncateClaimDeferredThenExecutedWhenPressureStarts() throws IOException, InterruptedException { + // Create a repository where disk pressure can be toggled + shutdown(); + + final AtomicLong usableSpace = new AtomicLong(Long.MAX_VALUE); + final FileSystemRepository localRepo = new FileSystemRepository(nifiProperties) { + @Override + public long getContainerUsableSpace(final String containerName) { + return usableSpace.get(); + } + + @Override + protected boolean isArchiveClearedOnLastRun(final String containerName) { + return true; + } + }; + + try { + final StandardResourceClaimManager localClaimManager = new StandardResourceClaimManager(); + localRepo.initialize(new StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP)); + localRepo.purge(); + + final long maxClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); + + // Create C1 (small) then C2 (large) on the same ResourceClaim + final ContentClaim c1 = localRepo.create(false); + try (final OutputStream out = localRepo.write(c1)) { + out.write(new byte[100]); + } + + final ContentClaim c2 = localRepo.create(false); + try (final OutputStream out = localRepo.write(c2)) { + out.write(new byte[(int) maxClaimLength + 4096]); + } + + assertTrue(c2.isTruncationCandidate()); + assertEquals(c1.getResourceClaim(), c2.getResourceClaim()); + + final Path filePath = getPath(localRepo, c1); + final long originalSize = Files.size(filePath); + + localClaimManager.decrementClaimantCount(c2.getResourceClaim()); + localClaimManager.markTruncatable(c2); + + // Wait for at least one run of the background task with NO pressure. + // File should NOT be truncated. + Thread.sleep(3_000); + assertEquals(originalSize, Files.size(filePath), "File should not have been truncated while disk pressure is off"); + + // Now turn on disk pressure + usableSpace.set(0); + + // Wait for the next background task run to truncate the file + final long expectedTruncatedSize = c2.getOffset(); + while (Files.size(filePath) != expectedTruncatedSize) { + Thread.sleep(100L); + } + + // Verify C1's data is still readable + try (final InputStream in = localRepo.read(c1)) { + assertNotNull(in); + } + } finally { + localRepo.shutdown(); + } + } + private byte[] readFully(final InputStream inStream, final int size) throws IOException { final ByteArrayOutputStream baos = new ByteArrayOutputStream(size); int len; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 9d6118e8ebe6..8dd97cd59757 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -810,4 +810,289 @@ public String changePartitionName(String swapLocation, String newPartitionName) return swapLocation; } } + + // ========================================================================= + // Truncation Feature: Helpers + // ========================================================================= + + /** + * Creates a mock queue + connection + queueProvider wired together, suitable for runtime truncation tests. + * Returns [claimManager, queueProvider, queue]. + */ + private record RuntimeRepoContext(StandardResourceClaimManager claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) { + } + + private RuntimeRepoContext createRuntimeRepoContext() { + final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); + final TestQueueProvider queueProvider = new TestQueueProvider(); + final Connection connection = Mockito.mock(Connection.class); + when(connection.getIdentifier()).thenReturn("1234"); + when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); + final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class); + when(queue.getIdentifier()).thenReturn("1234"); + when(connection.getFlowFileQueue()).thenReturn(queue); + queueProvider.addConnection(connection); + return new RuntimeRepoContext(claimManager, queueProvider, queue); + } + + private StandardContentClaim createClaim(final ResourceClaim rc, final long offset, final long length, final boolean truncationCandidate) { + final StandardContentClaim claim = new StandardContentClaim(rc, offset); + claim.setLength(length); + if (truncationCandidate) { + claim.setTruncationCandidate(true); + } + return claim; + } + + private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository repo, final FlowFileQueue queue, + final ContentClaim claim) throws IOException { + final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("uuid", UUID.randomUUID().toString()) + .contentClaim(claim) + .build(); + + final StandardRepositoryRecord createRecord = new StandardRepositoryRecord(queue); + createRecord.setWorking(flowFile, false); + createRecord.setDestination(queue); + repo.updateRepository(List.of(createRecord)); + + final StandardRepositoryRecord deleteRecord = new StandardRepositoryRecord(queue, flowFile); + deleteRecord.markForDelete(); + repo.updateRepository(List.of(deleteRecord)); + } + + /** + * Writes FlowFiles (one per claim) to a new repo, closes it, then recovers into a fresh repo + * and returns the recovered FlowFileRecords. + */ + private List writeAndRecover(final ContentClaim... claims) throws IOException { + final ResourceClaimManager writeClaimManager = new StandardResourceClaimManager(); + final TestQueueProvider writeQueueProvider = new TestQueueProvider(); + final Connection writeConnection = Mockito.mock(Connection.class); + when(writeConnection.getIdentifier()).thenReturn("1234"); + when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); + final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager(); + final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234", null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B"); + when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue); + writeQueueProvider.addConnection(writeConnection); + + try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(niFiProperties)) { + repo.initialize(writeClaimManager); + repo.loadFlowFiles(writeQueueProvider); + + final List records = new ArrayList<>(); + for (int i = 0; i < claims.length; i++) { + final FlowFileRecord ff = new StandardFlowFileRecord.Builder() + .id(i + 1L) + .addAttribute("uuid", "11111111-1111-1111-1111-" + String.format("%012d", i + 1)) + .contentClaim(claims[i]) + .build(); + final StandardRepositoryRecord rec = new StandardRepositoryRecord(writeQueue); + rec.setWorking(ff, false); + rec.setDestination(writeQueue); + records.add(rec); + } + repo.updateRepository(records); + } + + // Recover + final List recovered = new ArrayList<>(); + final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class); + when(recoveryQueue.getIdentifier()).thenReturn("1234"); + doAnswer(invocation -> { + recovered.add((FlowFileRecord) invocation.getArguments()[0]); + return null; + }).when(recoveryQueue).put(any(FlowFileRecord.class)); + + final Connection recoveryConnection = Mockito.mock(Connection.class); + when(recoveryConnection.getIdentifier()).thenReturn("1234"); + when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue); + final TestQueueProvider recoveryQueueProvider = new TestQueueProvider(); + recoveryQueueProvider.addConnection(recoveryConnection); + + try (final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(niFiProperties)) { + repo2.initialize(new StandardResourceClaimManager()); + repo2.loadFlowFiles(recoveryQueueProvider); + } + + return recovered; + } + + private FlowFileRecord findRecoveredByOffset(final List recovered, final long offset) { + return recovered.stream() + .filter(ff -> ff.getContentClaim() != null && ff.getContentClaim().getOffset() == offset) + .findFirst() + .orElse(null); + } + + // ========================================================================= + // Truncation Feature: Runtime Tests + // ========================================================================= + + @Test + public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue() throws IOException { + final RuntimeRepoContext ctx = createRuntimeRepoContext(); + final ResourceClaim rc = ctx.claimManager().newResourceClaim("container", "section", "1", false, false); + ctx.claimManager().incrementClaimantCount(rc); + ctx.claimManager().incrementClaimantCount(rc); // count = 2 so that after delete decrement it stays > 0 (not destructable) + final StandardContentClaim contentClaim = createClaim(rc, 1024L, 5_000_000L, true); + + try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(niFiProperties)) { + repo.initialize(ctx.claimManager()); + repo.loadFlowFiles(ctx.queueProvider()); + createAndDeleteFlowFile(repo, ctx.queue(), contentClaim); + repo.checkpoint(); + } + + final List truncated = new ArrayList<>(); + ctx.claimManager().drainTruncatableClaims(truncated, 100); + assertTrue(truncated.contains(contentClaim), "Truncatable claim should have been routed to the truncation queue"); + } + + @Test + public void testDestructableClaimTakesPriorityOverTruncatable() throws IOException { + final RuntimeRepoContext ctx = createRuntimeRepoContext(); + final ResourceClaim rc = ctx.claimManager().newResourceClaim("container", "section", "1", false, false); + ctx.claimManager().incrementClaimantCount(rc); // count = 1 -- will reach 0 after delete + final StandardContentClaim contentClaim = createClaim(rc, 1024L, 5_000_000L, true); + + try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(niFiProperties)) { + repo.initialize(ctx.claimManager()); + repo.loadFlowFiles(ctx.queueProvider()); + createAndDeleteFlowFile(repo, ctx.queue(), contentClaim); + repo.checkpoint(); + } + + final List destructed = new ArrayList<>(); + ctx.claimManager().drainDestructableClaims(destructed, 100); + assertTrue(destructed.contains(rc), "Resource claim should be destructable"); + + final List truncated = new ArrayList<>(); + ctx.claimManager().drainTruncatableClaims(truncated, 100); + assertFalse(truncated.contains(contentClaim), "Truncatable claim should NOT be in truncation queue when resource claim is destructable"); + } + + @Test + public void testUpdateRecordOriginalClaimQueuedForTruncation() throws IOException { + final RuntimeRepoContext ctx = createRuntimeRepoContext(); + + final ResourceClaim rc1 = ctx.claimManager().newResourceClaim("container", "section", "1", false, false); + ctx.claimManager().incrementClaimantCount(rc1); + ctx.claimManager().incrementClaimantCount(rc1); // count = 2 so it stays > 0 after decrement + final StandardContentClaim originalClaim = createClaim(rc1, 2048L, 5_000_000L, true); + + final ResourceClaim rc2 = ctx.claimManager().newResourceClaim("container", "section", "2", false, false); + ctx.claimManager().incrementClaimantCount(rc2); + final StandardContentClaim newClaim = createClaim(rc2, 0L, 100L, false); + + final FlowFileRecord originalFlowFile = new StandardFlowFileRecord.Builder() + .id(1L) + .addAttribute("uuid", UUID.randomUUID().toString()) + .contentClaim(originalClaim) + .build(); + + try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(niFiProperties)) { + repo.initialize(ctx.claimManager()); + repo.loadFlowFiles(ctx.queueProvider()); + + final StandardRepositoryRecord createRecord = new StandardRepositoryRecord(ctx.queue()); + createRecord.setWorking(originalFlowFile, false); + createRecord.setDestination(ctx.queue()); + repo.updateRepository(List.of(createRecord)); + + final FlowFileRecord updatedFlowFile = new StandardFlowFileRecord.Builder() + .fromFlowFile(originalFlowFile) + .contentClaim(newClaim) + .build(); + final StandardRepositoryRecord updateRecord = new StandardRepositoryRecord(ctx.queue(), originalFlowFile); + updateRecord.setWorking(updatedFlowFile, true); + updateRecord.setDestination(ctx.queue()); + repo.updateRepository(List.of(updateRecord)); + repo.checkpoint(); + } + + final List truncated = new ArrayList<>(); + ctx.claimManager().drainTruncatableClaims(truncated, 100); + assertTrue(truncated.contains(originalClaim), "Original claim should have been queued for truncation after content change"); + } + + // ========================================================================= + // Truncation Feature: Recovery Tests + // ========================================================================= + + @Test + public void testRecoveryMarksTruncationCandidateForLargeTailClaim() throws IOException { + final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); + final ResourceClaim rc = claimManager.newResourceClaim("container", "section", "1", false, false); + final StandardContentClaim smallClaim = createClaim(rc, 0L, 100L, false); + final StandardContentClaim largeClaim = createClaim(rc, 100L, 2_000_000L, false); + + final List recovered = writeAndRecover(smallClaim, largeClaim); + + final FlowFileRecord recoveredLargeFF = findRecoveredByOffset(recovered, 100L); + assertNotNull(recoveredLargeFF, "Should have recovered a FlowFile with the large claim"); + assertTrue(recoveredLargeFF.getContentClaim().isTruncationCandidate(), + "Large tail claim should be marked as truncation candidate after recovery"); + + final FlowFileRecord recoveredSmallFF = findRecoveredByOffset(recovered, 0L); + assertNotNull(recoveredSmallFF, "Should have recovered a FlowFile with the small claim"); + assertFalse(recoveredSmallFF.getContentClaim().isTruncationCandidate(), + "Small claim at offset 0 should NOT be a truncation candidate after recovery"); + } + + @Test + public void testRecoveryDoesNotMarkClonedClaim() throws IOException { + final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); + final ResourceClaim rc = claimManager.newResourceClaim("container", "section", "1", false, false); + final StandardContentClaim sharedClaim = createClaim(rc, 100L, 2_000_000L, false); + + // Two FlowFiles sharing the same claim (clone scenario) + final List recovered = writeAndRecover(sharedClaim, sharedClaim); + + for (final FlowFileRecord ff : recovered) { + if (ff.getContentClaim() != null) { + assertFalse(ff.getContentClaim().isTruncationCandidate(), + "Cloned/shared claim should NOT be a truncation candidate"); + } + } + } + + @Test + public void testRecoveryOnlyMarksTailClaim() throws IOException { + final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); + final ResourceClaim rc = claimManager.newResourceClaim("container", "section", "1", false, false); + final StandardContentClaim claim1 = createClaim(rc, 100L, 2_000_000L, false); + final StandardContentClaim claim2 = createClaim(rc, 2_000_100L, 3_000_000L, false); + + final List recovered = writeAndRecover(claim1, claim2); + + final FlowFileRecord tailFF = findRecoveredByOffset(recovered, 2_000_100L); + assertNotNull(tailFF, "Should have recovered the tail claim FlowFile"); + assertTrue(tailFF.getContentClaim().isTruncationCandidate(), + "Only the tail claim should be a truncation candidate"); + + final FlowFileRecord nonTailFF = findRecoveredByOffset(recovered, 100L); + assertNotNull(nonTailFF, "Should have recovered the non-tail claim FlowFile"); + assertFalse(nonTailFF.getContentClaim().isTruncationCandidate(), + "Non-tail large claim should NOT be a truncation candidate"); + } + + @Test + public void testRecoverySmallClaimAfterLargeDoesNotMarkLarge() throws IOException { + final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); + final ResourceClaim rc = claimManager.newResourceClaim("container", "section", "1", false, false); + final StandardContentClaim smallClaim1 = createClaim(rc, 0L, 100L, false); + final StandardContentClaim largeClaim = createClaim(rc, 100L, 2_000_000L, false); + final StandardContentClaim smallClaim2 = createClaim(rc, 2_000_100L, 50L, false); + + final List recovered = writeAndRecover(smallClaim1, largeClaim, smallClaim2); + + for (final FlowFileRecord ff : recovered) { + if (ff.getContentClaim() != null) { + assertFalse(ff.getContentClaim().isTruncationCandidate(), + "No claim should be a truncation candidate because the large claim is not the tail; claim offset=" + ff.getContentClaim().getOffset()); + } + } + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java index 814d3e816427..780a44f36995 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java +++ b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java @@ -29,6 +29,7 @@ public final class StandardContentClaim implements ContentClaim, Comparable claimantCounts = new ConcurrentHashMap<>(); private final BlockingQueue destructableClaims = new LinkedBlockingQueue<>(50000); + private final BlockingQueue truncatableClaims = new LinkedBlockingQueue<>(100000); @Override public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant, final boolean writable) { @@ -161,6 +162,30 @@ public void markDestructable(final ResourceClaim claim) { } } + @Override + public void markTruncatable(final ContentClaim contentClaim) { + if (contentClaim == null) { + return; + } + + final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); + synchronized (resourceClaim) { + if (isDestructable(resourceClaim)) { + return; + } + + logger.debug("Marking {} as truncatable", contentClaim); + try { + if (!truncatableClaims.offer(contentClaim, 1, TimeUnit.MINUTES)) { + logger.debug("Unable to mark {} as truncatable because the queue is full.", contentClaim); + } + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + logger.debug("Interrupted while marking {} as truncatable", contentClaim, ie); + } + } + } + @Override public void drainDestructableClaims(final Collection destination, final int maxElements) { final int drainedCount = destructableClaims.drainTo(destination, maxElements); @@ -179,6 +204,12 @@ public void drainDestructableClaims(final Collection destination, } } + @Override + public void drainTruncatableClaims(final Collection destination, final int maxElements) { + final int drainedCount = truncatableClaims.drainTo(destination, maxElements); + logger.debug("Drained {} truncatable claims to {}", drainedCount, destination); + } + @Override public void purge() { claimantCounts.clear(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java index 7fb77b2739c8..dd36bdd230ac 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java @@ -21,12 +21,14 @@ import org.junit.jupiter.api.Timeout; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestStandardResourceClaimManager { @@ -56,4 +58,53 @@ public void testGetClaimantCountWhileMarkingDestructable() throws InterruptedExc manager.drainDestructableClaims(new ArrayList<>(), 1); assertSame(completedObject, future.get()); } + + @Test + public void testMarkTruncatableSkipsDestructableResourceClaim() { + final StandardResourceClaimManager manager = new StandardResourceClaimManager(); + + // Create a resource claim with claimant count 0 and mark it destructable + final ResourceClaim rc = manager.newResourceClaim("container", "section", "id1", false, false); + manager.markDestructable(rc); + + // Create a content claim on that resource claim + final StandardContentClaim contentClaim = new StandardContentClaim(rc, 0); + contentClaim.setLength(1024); + contentClaim.setTruncationCandidate(true); + + // markTruncatable should skip this because the resource claim is already destructable + manager.markTruncatable(contentClaim); + + // Drain truncatable claims - should be empty + final List truncated = new ArrayList<>(); + manager.drainTruncatableClaims(truncated, 10); + assertTrue(truncated.isEmpty(), "Truncatable claims should be empty because the resource claim is destructable"); + } + + @Test + public void testMarkTruncatableAndDrainRespectsMaxElements() { + final StandardResourceClaimManager manager = new StandardResourceClaimManager(); + + // Create 5 truncatable claims, each on a distinct resource claim with a positive claimant count + for (int i = 0; i < 5; i++) { + final ResourceClaim rc = manager.newResourceClaim("container", "section", "id-" + i, false, false); + // Give each resource claim a positive claimant count so it's not destructable + manager.incrementClaimantCount(rc); + + final StandardContentClaim cc = new StandardContentClaim(rc, 0); + cc.setLength(1024); + cc.setTruncationCandidate(true); + manager.markTruncatable(cc); + } + + // Drain with maxElements=3 + final List batch1 = new ArrayList<>(); + manager.drainTruncatableClaims(batch1, 3); + assertEquals(3, batch1.size(), "First drain should return exactly 3 claims"); + + // Drain again - should get remaining 2 + final List batch2 = new ArrayList<>(); + manager.drainTruncatableClaims(batch2, 10); + assertEquals(2, batch2.size(), "Second drain should return the remaining 2 claims"); + } } diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java index 08c9fd2eca79..4b24f3af0459 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java @@ -260,6 +260,11 @@ public long getLength() { return resourceClaim.getLength(); } + @Override + public boolean isTruncationCandidate() { + return false; + } + @Override public int compareTo(final ContentClaim o) { return resourceClaim.compareTo(o.getResourceClaim()); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java new file mode 100644 index 000000000000..6e3648f5aca2 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.List; +import java.util.Random; +import java.util.Set; + +@DefaultSchedule(period = "10 mins") +public class GenerateTruncatableFlowFiles extends AbstractProcessor { + + static final PropertyDescriptor BATCH_COUNT = new PropertyDescriptor.Builder() + .name("Batch Count") + .description("The maximum number of batches to generate. Each batch produces 10 FlowFiles (9 small + 1 large). " + + "Once this many batches have been generated, no more FlowFiles will be produced until the processor is stopped and restarted.") + .required(true) + .defaultValue("10") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor SMALL_FILE_SIZE = new PropertyDescriptor.Builder() + .name("Small File Size") + .description("Size of each small FlowFile in bytes") + .required(true) + .defaultValue("1 KB") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + static final PropertyDescriptor LARGE_FILE_SIZE = new PropertyDescriptor.Builder() + .name("Large File Size") + .description("Size of each large FlowFile in bytes") + .required(true) + .defaultValue("10 MB") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + static final PropertyDescriptor SMALL_FILES_PER_BATCH = new PropertyDescriptor.Builder() + .name("Small Files Per Batch") + .description("Number of small FlowFiles to generate per batch") + .required(true) + .defaultValue("9") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + return List.of(BATCH_COUNT, + SMALL_FILE_SIZE, + LARGE_FILE_SIZE, + SMALL_FILES_PER_BATCH); + } + + @Override + public Set getRelationships() { + return Set.of(REL_SUCCESS); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchCount = context.getProperty(BATCH_COUNT).asInteger(); + final Random random = new Random(); + final int smallFileSize = context.getProperty(SMALL_FILE_SIZE).asDataSize(DataUnit.B).intValue(); + final int largeFileSize = context.getProperty(LARGE_FILE_SIZE).asDataSize(DataUnit.B).intValue(); + final int smallFilesPerBatch = context.getProperty(SMALL_FILES_PER_BATCH).asInteger(); + + for (int batch = 0; batch < batchCount; batch++) { + // Generate small FlowFiles with priority = 10 (low priority, processed last by PriorityAttributePrioritizer) + for (int i = 0; i < smallFilesPerBatch; i++) { + createFlowFile(session, random, smallFileSize, "10"); + } + + // Generate one large FlowFile with priority = 1 (high priority, processed first by PriorityAttributePrioritizer) + createFlowFile(session, random, largeFileSize, "1"); + } + } + + private void createFlowFile(final ProcessSession session, final Random random, final int fileSize, final String priority) { + FlowFile flowFile = session.create(); + flowFile = session.putAttribute(flowFile, "priority", priority); + final byte[] data = new byte[fileSize]; + random.nextBytes(data); + flowFile = session.write(flowFile, out -> out.write(data)); + session.transfer(flowFile, REL_SUCCESS); + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index a12f954cb84a..d4a9b4c81cf8 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -29,6 +29,7 @@ org.apache.nifi.processors.tests.system.FakeProcessor org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor org.apache.nifi.processors.tests.system.GenerateAndCountCallbacks org.apache.nifi.processors.tests.system.GenerateFlowFile +org.apache.nifi.processors.tests.system.GenerateTruncatableFlowFiles org.apache.nifi.processors.tests.system.HoldInput org.apache.nifi.processors.tests.system.IngestFile org.apache.nifi.processors.tests.system.LoopFlowFile diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java new file mode 100644 index 000000000000..012739ca872d --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.tests.system.repositories; + +import org.apache.nifi.tests.system.NiFiInstance; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * System test that verifies the truncation feature works correctly after a NiFi restart. + *

+ * During the first run, NiFi is configured with very conservative truncation settings (99% archive + * usage threshold), so truncation never activates. FlowFiles are generated but not deleted. + *

+ *

+ * NiFi is then stopped, reconfigured with aggressive truncation settings (1% archive usage threshold), + * and restarted. On recovery, {@code WriteAheadFlowFileRepository.restoreFlowFiles()} re-derives + * truncation candidates by analyzing the recovered FlowFiles' ContentClaims. After restart, the large + * FlowFiles are deleted, and the test verifies that the content repository files are truncated on disk. + *

+ */ +public class ContentClaimTruncationAfterRestartIT extends NiFiSystemIT { + + @Override + protected Map getNifiPropertiesOverrides() { + // Phase 1: Conservative settings — truncation should NOT occur + final Map overrides = new HashMap<>(); + overrides.put("nifi.flowfile.repository.checkpoint.interval", "1 sec"); + overrides.put("nifi.content.claim.max.appendable.size", "50 KB"); + // Very high archive threshold means no disk pressure, so truncation never activates + overrides.put("nifi.content.repository.archive.max.usage.percentage", "99%"); + overrides.put("nifi.content.repository.archive.cleanup.frequency", "1 sec"); + return overrides; + } + + @Override + protected boolean isAllowFactoryReuse() { + return false; + } + + @Test + public void testTruncationOccursAfterRestartWithRecoveredCandidates() throws NiFiClientException, IOException, InterruptedException { + // === Phase 1: Generate FlowFiles with conservative settings (no truncation) === + + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateTruncatableFlowFiles"); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + + final Map generateProps = Map.of( + "Batch Count", "10", + "Small File Size", "1 KB", + "Large File Size", "10 MB", + "Small Files Per Batch", "9" + ); + getClientUtil().updateProcessorProperties(generate, generateProps); + + ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success"); + connection = getClientUtil().updateConnectionPrioritizer(connection, "PriorityAttributePrioritizer"); + connection = getClientUtil().updateConnectionBackpressure(connection, 10000, 100L * 1024 * 1024); + + // Generate all 100 FlowFiles (90 small @ 1 KB + 10 large @ 10 MB) + getClientUtil().startProcessor(generate); + waitForQueueCount(connection.getId(), 100); + getClientUtil().stopProcessor(generate); + getClientUtil().waitForStoppedProcessor(generate.getId()); + + // Verify the content repository is large — the 10 MB FlowFiles are on disk + final File contentRepoDir = new File(getNiFiInstance().getInstanceDirectory(), "content_repository"); + final long thresholdBytes = 1024 * 1024; // 1 MB + final long sizeBeforeRestart = getContentRepoSize(contentRepoDir); + assertTrue(sizeBeforeRestart > thresholdBytes, + "Content repository should be large before restart, but was " + sizeBeforeRestart + " bytes"); + + // === Phase 2: Stop NiFi, reconfigure for aggressive truncation, and restart === + + final NiFiInstance nifiInstance = getNiFiInstance(); + nifiInstance.stop(); + + // Switch archive threshold to 1% so truncation activates under disk pressure + nifiInstance.setProperties(Map.of( + "nifi.content.repository.archive.max.usage.percentage", "1%" + )); + + nifiInstance.start(true); + + // After restart, WriteAheadFlowFileRepository.restoreFlowFiles() should have re-derived + // that the 10 large tail claims are truncation candidates. + + // Run TerminateFlowFile 10 times. Due to PriorityAttributePrioritizer, the 10 large + // FlowFiles (priority=1) are dequeued first. + for (int i = 0; i < 10; i++) { + final ProcessorEntity terminateAfterRestart = getNifiClient().getProcessorClient().getProcessor(terminate.getId()); + getNifiClient().getProcessorClient().runProcessorOnce(terminateAfterRestart); + getClientUtil().waitForStoppedProcessor(terminateAfterRestart.getId()); + } + + waitForQueueCount(connection.getId(), 90); + + // Wait for the content repository files to be truncated. + // Before truncation: ~10 files of ~10 MB each = ~100 MB total. + // After truncation: ~10 files of ~9 KB each = ~90 KB total. + waitFor(() -> { + try { + return getContentRepoSize(contentRepoDir) < thresholdBytes; + } catch (final Exception e) { + return false; + } + }); + + final long finalSize = getContentRepoSize(contentRepoDir); + assertTrue(finalSize < thresholdBytes, + "Content repository total size should be below " + thresholdBytes + " bytes after truncation, but was " + finalSize); + } + + private long getContentRepoSize(final File dir) { + if (dir == null || !dir.exists()) { + return 0; + } + + final File[] children = dir.listFiles(); + if (children == null) { + return 0L; + } + + long total = 0; + for (final File child : children) { + if (child.isDirectory()) { + if (child.getName().equals("archive")) { + continue; // Skip archive directories + } + + total += getContentRepoSize(child); + } else { + total += child.length(); + } + } + + return total; + } +} diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java new file mode 100644 index 000000000000..c8ae97087ff1 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.tests.system.repositories; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * System test that verifies the end-to-end truncation feature. It generates FlowFiles with a pattern + * of 9 small (1 KB) + 1 large (10 MB) per batch, removes only the large FlowFiles via priority-based + * ordering, and then verifies that the content repository files are truncated on disk. + */ +public class ContentClaimTruncationIT extends NiFiSystemIT { + + @Override + protected Map getNifiPropertiesOverrides() { + final Map overrides = new HashMap<>(); + // Use a short checkpoint interval so truncatable claims are flushed to the ResourceClaimManager promptly + overrides.put("nifi.flowfile.repository.checkpoint.interval", "1 sec"); + overrides.put("nifi.content.repository.archive.cleanup.frequency", "1 sec"); + // Explicitly set the max appendable claim size (same as system test default, but explicit for clarity) + overrides.put("nifi.content.claim.max.appendable.size", "50 KB"); + // Set archive threshold extremely low so that truncation occurs quickly + overrides.put("nifi.content.repository.archive.max.usage.percentage", "1%"); + return overrides; + } + + @Override + protected boolean isAllowFactoryReuse() { + // Don't reuse the NiFi instance since we override checkpoint interval + return false; + } + + @Test + public void testLargeFlowFileTruncation() throws NiFiClientException, IOException, InterruptedException { + // Create the processors + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateTruncatableFlowFiles"); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + + // Configure GenerateTruncatableFlowFiles with 10 batches (100 FlowFiles total) + final Map generateProps = Map.of( + "Batch Count", "10", + "Small File Size", "1 KB", + "Large File Size", "10 MB", + "Small Files Per Batch", "9" + ); + getClientUtil().updateProcessorProperties(generate, generateProps); + + // Create connection with PriorityAttributePrioritizer and 100 MB backpressure + ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success"); + connection = getClientUtil().updateConnectionPrioritizer(connection, "PriorityAttributePrioritizer"); + connection = getClientUtil().updateConnectionBackpressure(connection, 10000, 100L * 1024 * 1024); + + // Start the generator and wait for 100 FlowFiles to be queued + getClientUtil().startProcessor(generate); + waitForQueueCount(connection.getId(), 100); + + // Stop the generator + getClientUtil().stopProcessor(generate); + getClientUtil().waitForStoppedProcessor(generate.getId()); + + // Run TerminateFlowFile 10 times. Due to PriorityAttributePrioritizer, + // the 10 large FlowFiles (priority=1) will be dequeued first. + for (int i = 0; i < 10; i++) { + getNifiClient().getProcessorClient().runProcessorOnce(terminate); + getClientUtil().waitForStoppedProcessor(terminate.getId()); + } + + // Wait for 90 FlowFiles remaining (the 10 large ones have been removed) + waitForQueueCount(connection.getId(), 90); + + // Wait for the content repository files to be truncated. + // Before truncation: ~10 files of ~10 MB each = ~100 MB total. + // After truncation: ~10 files of ~9 KB each = ~90 KB total. + // We set a generous threshold of 1 MB. + final File contentRepoDir = new File(getNiFiInstance().getInstanceDirectory(), "content_repository"); + final long thresholdBytes = 1024 * 1024; // 1 MB + + waitFor(() -> { + try { + final long totalSize = getContentRepoSize(contentRepoDir.toPath()); + return totalSize < thresholdBytes; + } catch (final IOException e) { + return false; + } + }); + + // Final assertion + final long finalSize = getContentRepoSize(contentRepoDir.toPath()); + assertTrue(finalSize < thresholdBytes, + "Content repository total size should be below " + thresholdBytes + " bytes after truncation, but was " + finalSize); + } + + /** + * Walks the content repository directory (excluding any "archive" subdirectories) + * and returns the total size of all regular files. + */ + private long getContentRepoSize(final Path contentRepoPath) throws IOException { + if (!Files.exists(contentRepoPath)) { + return 0; + } + + final AtomicLong totalSize = new AtomicLong(0); + Files.walkFileTree(contentRepoPath, new SimpleFileVisitor<>() { + @Override + public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attrs) { + // Skip archive directories + if (dir.getFileName() != null && "archive".equals(dir.getFileName().toString())) { + return FileVisitResult.SKIP_SUBTREE; + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) { + totalSize.addAndGet(attrs.size()); + return FileVisitResult.CONTINUE; + } + }); + + return totalSize.get(); + } +} From 24a3e0bb12198747e242e33c3e5bfc4fdb2a89b0 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 13 Mar 2026 16:56:14 -0400 Subject: [PATCH 2/2] NIFI-15570: Addressed review feedback; some test cleanup --- .../repository/FileSystemRepository.java | 21 ++- .../WriteAheadFlowFileRepository.java | 8 +- .../repository/TestFileSystemRepository.java | 177 ++++++++---------- .../TestWriteAheadFlowFileRepository.java | 150 ++++++++------- .../claim/StandardResourceClaimManager.java | 2 +- .../system/GenerateTruncatableFlowFiles.java | 5 +- 6 files changed, 175 insertions(+), 188 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index dcef02615f5d..8987b6464f2c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -101,6 +101,8 @@ public class FileSystemRepository implements ContentRepository { private final List containerNames; private final AtomicLong index; + // Executor handles: BinDestructableClaims, one ArchiveOrDestroyDestructableClaims per content repository container, + // TruncateClaims, and archive directory scanning tasks submitted during initialization. private final ScheduledExecutorService executor = new FlowEngine(6, "FileSystemRepository Workers", true); private final ConcurrentMap> reclaimable = new ConcurrentHashMap<>(); private final Map containerStateMap = new HashMap<>(); @@ -699,7 +701,7 @@ public int incrementClaimaintCount(final ContentClaim claim) { } if (claim.isTruncationCandidate() && claim instanceof final StandardContentClaim scc) { - LOG.debug("{} is a truncation candidate, but is being claimed again. Setting truncation candidate to false.", claim); + LOG.debug("{} is a truncation candidate, but is being claimed again. Setting truncation candidate to false", claim); scc.setTruncationCandidate(false); } @@ -1069,6 +1071,9 @@ public void run() { // If able, truncate those claims. Otherwise, save those claims in the Truncation Claim Manager to be truncated on the next run. // This prevents us from having a case where we could truncate a big claim but we don't because we're not yet running out of disk space, // but then we later start to run out of disk space and lost the opportunity to truncate that big claim. + // Loop to drain the entire queue in a single invocation rather than waiting for the next scheduled run. Because the default + // interval is 1 minute, waiting for the next run could delay truncation on a disk that is already under pressure and increases + // the risk of having too many claims that the queue overflows (in which case we would lose some optimization). while (true) { final List toTruncate = new ArrayList<>(); resourceClaimManager.drainTruncatableClaims(toTruncate, 10_000); @@ -1086,7 +1091,7 @@ private void truncateClaims(final List toTruncate, final Map new ArrayList<>()).add(claim); continue; } @@ -1111,7 +1116,7 @@ private boolean isTruncationActiveForContainer(final String container, final Map } if (!isArchiveClearedOnLastRun(container)) { - LOG.debug("Truncation is not active for container {} because the archive was not cleared on the last run.", container); + LOG.debug("Truncation is not active for container {} because the archive was not cleared on the last run", container); activationCache.put(container, false); return false; } @@ -1120,7 +1125,7 @@ private boolean isTruncationActiveForContainer(final String container, final Map try { usableSpace = getContainerUsableSpace(container); } catch (final IOException ioe) { - LOG.warn("Failed to determine usable space for container {}. Will not truncate claims for this container.", container, ioe); + LOG.warn("Failed to determine usable space for container {}. Will not truncate claims for this container", container, ioe); return false; } @@ -1153,7 +1158,7 @@ private void truncate(final ContentClaim claim) { // This is unlikely but can occur if the claim was truncatable and the underlying Resource Claim becomes // destructable. In this case, we may archive or delete the entire ResourceClaim. This is safe to ignore, // since it means the data is cleaned up anyway. - LOG.debug("Failed to truncate {} because file does not exist.", claim, nsfe); + LOG.debug("Failed to truncate {} because file [{}] does not exist", claim, path, nsfe); } catch (final IOException e) { LOG.warn("Failed to truncate {} to {} bytes", claim, claim.getOffset(), e); } @@ -2068,7 +2073,7 @@ private static class TruncationClaimManager { private static final int MAX_THRESHOLD = 100_000; private final Map> truncationClaims = new HashMap<>(); - public synchronized void addTruncationClaims(final String container, final List claim) { + synchronized void addTruncationClaims(final String container, final List claim) { final List contentClaims = truncationClaims.computeIfAbsent(container, c -> new ArrayList<>()); contentClaims.addAll(claim); @@ -2082,12 +2087,12 @@ public synchronized void addTruncationClaims(final String container, final List< } } - public synchronized List removeTruncationClaims(final String container) { + synchronized List removeTruncationClaims(final String container) { final List removed = truncationClaims.remove(container); return removed == null ? Collections.emptyList() : removed; } - public synchronized List removeTruncationClaims(final ResourceClaim resourceClaim) { + synchronized List removeTruncationClaims(final ResourceClaim resourceClaim) { final List contentClaims = truncationClaims.get(resourceClaim.getContainer()); if (contentClaims == null) { return Collections.emptyList(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 963f02e028a7..b886c062b2e0 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -159,6 +159,8 @@ public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) { this.maxCharactersToCache = nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE); final long maxAppendableClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); + // Cap the truncation threshold at 1 MB so that claims larger than 1 MB are always eligible + // for truncation regardless of how large maxAppendableClaimSize is configured. truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength); final String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX); @@ -852,11 +854,9 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException // If any Content Claim was determined to be truncatable, mark it as such now. for (final StandardContentClaim eligible : truncationEligibleClaims) { final ContentClaim latestForResource = latestContentClaimByResourceClaim.get(eligible.getResourceClaim()); - if (!Objects.equals(eligible, latestForResource)) { - continue; + if (Objects.equals(eligible, latestForResource)) { + eligible.setTruncationCandidate(true); } - - eligible.setTruncationCandidate(true); } // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index 206ff60d3349..102b927af3ec 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -82,6 +82,7 @@ public class TestFileSystemRepository { private Path originalNifiPropertiesFile; private Path rootFile; private NiFiProperties nifiProperties; + private long maxClaimLength; @BeforeEach public void setup() throws IOException { @@ -93,6 +94,7 @@ public void setup() throws IOException { NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 sec" ); nifiProperties = NiFiProperties.createBasicNiFiProperties(originalNifiPropertiesFile.toString(), additionalProperties); + maxClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); repository = new FileSystemRepository(nifiProperties); claimManager = new StandardResourceClaimManager(); repository.initialize(new StandardContentRepositoryContext(claimManager, EventReporter.NO_OP)); @@ -149,7 +151,6 @@ public void testIsArchived() { @Timeout(30) public void testClaimsArchivedWhenMarkedDestructable() throws IOException, InterruptedException { final ContentClaim contentClaim = repository.create(false); - final long configuredAppendableClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); final Map containerPaths = nifiProperties.getContentRepositoryPaths(); assertEquals(1, containerPaths.size()); final String containerName = containerPaths.keySet().iterator().next(); @@ -158,7 +159,7 @@ public void testClaimsArchivedWhenMarkedDestructable() throws IOException, Inter long bytesWritten = 0L; final byte[] bytes = "Hello World".getBytes(StandardCharsets.UTF_8); - while (bytesWritten <= configuredAppendableClaimLength) { + while (bytesWritten <= maxClaimLength) { out.write(bytes); bytesWritten += bytes.length; } @@ -484,12 +485,9 @@ public void testRemoveDeletesFileIfNoClaimants() throws IOException { repository.incrementClaimaintCount(claim); final Path claimPath = getPath(claim); - final String maxAppendableClaimLength = nifiProperties.getMaxAppendableClaimSize(); - final int maxClaimLength = DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue(); - // Create the file. try (final OutputStream out = repository.write(claim)) { - out.write(new byte[maxClaimLength]); + out.write(new byte[(int) maxClaimLength]); } int count = repository.decrementClaimantCount(claim); @@ -702,9 +700,7 @@ public void testRemoveWhileWritingToClaim() throws IOException { // write at least 1 MB to the output stream so that when we close the output stream // the repo won't keep the stream open. - final String maxAppendableClaimLength = nifiProperties.getMaxAppendableClaimSize(); - final int maxClaimLength = DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue(); - final byte[] buff = new byte[maxClaimLength]; + final byte[] buff = new byte[(int) maxClaimLength]; out.write(buff); out.write(buff); @@ -907,27 +903,24 @@ protected boolean archive(Path curPath) { @Test public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() throws IOException { - final long maxClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); - - // Create a small claim C1 at offset 0. Write less data than maxAppendableClaimLength so the ResourceClaim + // Create a small claim at offset 0. Write less data than maxAppendableClaimLength so the ResourceClaim // is recycled back to the writable queue. - final ContentClaim c1 = repository.create(false); + final ContentClaim smallClaim = repository.create(false); final byte[] smallData = new byte[100]; - try (final OutputStream out = repository.write(c1)) { + try (final OutputStream out = repository.write(smallClaim)) { out.write(smallData); } - // C1 should NOT be a truncation candidate (it's small) - assertFalse(c1.isTruncationCandidate(), "Small claim at offset 0 should not be a truncation candidate"); + assertFalse(smallClaim.isTruncationCandidate()); - // Now create C2 on potentially the same ResourceClaim, writing more than maxAppendableClaimLength to freeze - // the ResourceClaim. Because c1 was small and recycled, c2 will be at a non-zero offset on the same ResourceClaim. - final ContentClaim c2 = repository.create(false); + // Now create a large claim on potentially the same ResourceClaim, writing more than maxAppendableClaimLength + // to freeze the ResourceClaim. Because smallClaim was small and recycled, largeClaim will be at a non-zero + // offset on the same ResourceClaim. + final ContentClaim largeClaim = repository.create(false); final byte[] largeData = new byte[(int) maxClaimLength + 1024]; - try (final OutputStream out = repository.write(c2)) { + try (final OutputStream out = repository.write(largeClaim)) { out.write(largeData); } - // C2 should be a truncation candidate: large and at a non-zero offset - assertTrue(c2.isTruncationCandidate(), "Large claim at non-zero offset should be a truncation candidate"); + assertTrue(largeClaim.isTruncationCandidate()); // Negative case: create a standalone large claim at offset 0 (fresh ResourceClaim) // To ensure a fresh ResourceClaim, write large data to all writable claims to exhaust them, @@ -936,7 +929,7 @@ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() throws IOEx ContentClaim offsetZeroClaim = null; for (int i = 0; i < 20; i++) { final ContentClaim candidate = repository.create(false); - if (candidate instanceof StandardContentClaim scc && scc.getOffset() == 0) { + if (candidate instanceof StandardContentClaim standardContentClaim && standardContentClaim.getOffset() == 0) { // Write large data that exceeds maxAppendableClaimLength try (final OutputStream out = repository.write(candidate)) { out.write(new byte[(int) maxClaimLength + 1024]); @@ -951,33 +944,29 @@ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() throws IOEx } } - assertNotNull(offsetZeroClaim, "Should have found a claim at offset 0"); - assertFalse(offsetZeroClaim.isTruncationCandidate(), "Large claim at offset 0 should NOT be a truncation candidate"); + assertNotNull(offsetZeroClaim); + assertFalse(offsetZeroClaim.isTruncationCandidate()); } @Test public void testIncrementClaimantCountClearsTruncationCandidate() throws IOException { - final long maxClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); - // Create a small claim to start a ResourceClaim, then a large claim to freeze it - final ContentClaim c1 = repository.create(false); - try (final OutputStream out = repository.write(c1)) { + final ContentClaim smallClaim = repository.create(false); + try (final OutputStream out = repository.write(smallClaim)) { out.write(new byte[100]); } - final ContentClaim c2 = repository.create(false); - try (final OutputStream out = repository.write(c2)) { + final ContentClaim largeClaim = repository.create(false); + try (final OutputStream out = repository.write(largeClaim)) { out.write(new byte[(int) maxClaimLength + 1024]); } - // c2 should be a truncation candidate - assertTrue(c2.isTruncationCandidate(), "Claim should be a truncation candidate before incrementClaimaintCount"); + assertTrue(largeClaim.isTruncationCandidate()); // Simulate a clone by incrementing claimant count - repository.incrementClaimaintCount(c2); + repository.incrementClaimaintCount(largeClaim); - // After incrementing, it should no longer be a truncation candidate - assertFalse(c2.isTruncationCandidate(), "Claim should NOT be a truncation candidate after incrementClaimaintCount"); + assertFalse(largeClaim.isTruncationCandidate()); } @Test @@ -986,7 +975,7 @@ public void testTruncateClaimReducesFileSizeAndPreservesEarlierData() throws IOE // We need to create our own repository that overrides getContainerUsableSpace to simulate disk pressure shutdown(); - final FileSystemRepository localRepo = new FileSystemRepository(nifiProperties) { + final FileSystemRepository localRepository = new FileSystemRepository(nifiProperties) { @Override public long getContainerUsableSpace(final String containerName) { return 0; // Extreme disk pressure @@ -1000,55 +989,53 @@ protected boolean isArchiveClearedOnLastRun(final String containerName) { try { final StandardResourceClaimManager localClaimManager = new StandardResourceClaimManager(); - localRepo.initialize(new StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP)); - localRepo.purge(); + localRepository.initialize(new StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP)); + localRepository.purge(); - final long maxClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); - - // Create C1 (small) then C2 (large) on the same ResourceClaim - final ContentClaim c1 = localRepo.create(false); + // Create a small claim then a large claim on the same ResourceClaim + final ContentClaim smallClaim = localRepository.create(false); final byte[] smallData = "Hello World - small claim data".getBytes(StandardCharsets.UTF_8); - try (final OutputStream out = localRepo.write(c1)) { + try (final OutputStream out = localRepository.write(smallClaim)) { out.write(smallData); } - final ContentClaim c2 = localRepo.create(false); + final ContentClaim largeClaim = localRepository.create(false); final byte[] largeData = new byte[(int) maxClaimLength + 4096]; new Random().nextBytes(largeData); - try (final OutputStream out = localRepo.write(c2)) { + try (final OutputStream out = localRepository.write(largeClaim)) { out.write(largeData); } - assertTrue(c2.isTruncationCandidate(), "C2 should be a truncation candidate"); + assertTrue(largeClaim.isTruncationCandidate()); // Both claims should share the same resource claim - assertEquals(c1.getResourceClaim(), c2.getResourceClaim(), "Both claims should share the same ResourceClaim"); + assertEquals(smallClaim.getResourceClaim(), largeClaim.getResourceClaim()); // Get the file path - final Path filePath = getPath(localRepo, c1); + final Path filePath = getPath(localRepository, smallClaim); assertNotNull(filePath); final long originalSize = Files.size(filePath); - assertTrue(originalSize > maxClaimLength, "File should be larger than maxAppendableClaimLength"); + assertTrue(originalSize > maxClaimLength); - // Decrement claimant count for C2 to 0 (C1 still holds a reference) - localClaimManager.decrementClaimantCount(c2.getResourceClaim()); + // Decrement claimant count for the large claim to 0 (small claim still holds a reference) + localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim()); - // Mark C2 as truncatable - localClaimManager.markTruncatable(c2); + // Mark the large claim as truncatable + localClaimManager.markTruncatable(largeClaim); // Wait for the TruncateClaims background task to truncate the file. Poll the file size until it shrinks. - final long expectedTruncatedSize = c2.getOffset(); + final long expectedTruncatedSize = largeClaim.getOffset(); while (Files.size(filePath) != expectedTruncatedSize) { Thread.sleep(100L); } - // Verify C1's data is still fully readable - try (final InputStream in = localRepo.read(c1)) { + // Verify the small claim's data is still fully readable + try (final InputStream in = localRepository.read(smallClaim)) { final byte[] readData = readFully(in, smallData.length); - assertArrayEquals(smallData, readData, "C1's data should still be fully readable after truncation"); + assertArrayEquals(smallData, readData); } } finally { - localRepo.shutdown(); + localRepository.shutdown(); } } @@ -1058,7 +1045,7 @@ public void testTruncateNotActiveWhenDiskNotPressured() throws IOException, Inte // Create repository with ample disk space shutdown(); - final FileSystemRepository localRepo = new FileSystemRepository(nifiProperties) { + final FileSystemRepository localRepository = new FileSystemRepository(nifiProperties) { @Override public long getContainerUsableSpace(final String containerName) { return Long.MAX_VALUE; // Plenty of space @@ -1072,33 +1059,31 @@ protected boolean isArchiveClearedOnLastRun(final String containerName) { try { final StandardResourceClaimManager localClaimManager = new StandardResourceClaimManager(); - localRepo.initialize(new StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP)); - localRepo.purge(); - - final long maxClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); + localRepository.initialize(new StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP)); + localRepository.purge(); - final ContentClaim c1 = localRepo.create(false); - try (final OutputStream out = localRepo.write(c1)) { + final ContentClaim smallClaim = localRepository.create(false); + try (final OutputStream out = localRepository.write(smallClaim)) { out.write(new byte[100]); } - final ContentClaim c2 = localRepo.create(false); - try (final OutputStream out = localRepo.write(c2)) { + final ContentClaim largeClaim = localRepository.create(false); + try (final OutputStream out = localRepository.write(largeClaim)) { out.write(new byte[(int) maxClaimLength + 4096]); } - assertTrue(c2.isTruncationCandidate()); + assertTrue(largeClaim.isTruncationCandidate()); - final Path filePath = getPath(localRepo, c1); + final Path filePath = getPath(localRepository, smallClaim); final long originalSize = Files.size(filePath); - localClaimManager.decrementClaimantCount(c2.getResourceClaim()); - localClaimManager.markTruncatable(c2); + localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim()); + localClaimManager.markTruncatable(largeClaim); Thread.sleep(3000L); - assertEquals(originalSize, Files.size(filePath), "File should NOT have been truncated when disk space is ample"); + assertEquals(originalSize, Files.size(filePath)); } finally { - localRepo.shutdown(); + localRepository.shutdown(); } } @@ -1109,7 +1094,7 @@ public void testTruncateClaimDeferredThenExecutedWhenPressureStarts() throws IOE shutdown(); final AtomicLong usableSpace = new AtomicLong(Long.MAX_VALUE); - final FileSystemRepository localRepo = new FileSystemRepository(nifiProperties) { + final FileSystemRepository localRepository = new FileSystemRepository(nifiProperties) { @Override public long getContainerUsableSpace(final String containerName) { return usableSpace.get(); @@ -1123,62 +1108,60 @@ protected boolean isArchiveClearedOnLastRun(final String containerName) { try { final StandardResourceClaimManager localClaimManager = new StandardResourceClaimManager(); - localRepo.initialize(new StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP)); - localRepo.purge(); - - final long maxClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); + localRepository.initialize(new StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP)); + localRepository.purge(); - // Create C1 (small) then C2 (large) on the same ResourceClaim - final ContentClaim c1 = localRepo.create(false); - try (final OutputStream out = localRepo.write(c1)) { + // Create a small claim then a large claim on the same ResourceClaim + final ContentClaim smallClaim = localRepository.create(false); + try (final OutputStream out = localRepository.write(smallClaim)) { out.write(new byte[100]); } - final ContentClaim c2 = localRepo.create(false); - try (final OutputStream out = localRepo.write(c2)) { + final ContentClaim largeClaim = localRepository.create(false); + try (final OutputStream out = localRepository.write(largeClaim)) { out.write(new byte[(int) maxClaimLength + 4096]); } - assertTrue(c2.isTruncationCandidate()); - assertEquals(c1.getResourceClaim(), c2.getResourceClaim()); + assertTrue(largeClaim.isTruncationCandidate()); + assertEquals(smallClaim.getResourceClaim(), largeClaim.getResourceClaim()); - final Path filePath = getPath(localRepo, c1); + final Path filePath = getPath(localRepository, smallClaim); final long originalSize = Files.size(filePath); - localClaimManager.decrementClaimantCount(c2.getResourceClaim()); - localClaimManager.markTruncatable(c2); + localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim()); + localClaimManager.markTruncatable(largeClaim); // Wait for at least one run of the background task with NO pressure. // File should NOT be truncated. Thread.sleep(3_000); - assertEquals(originalSize, Files.size(filePath), "File should not have been truncated while disk pressure is off"); + assertEquals(originalSize, Files.size(filePath)); // Now turn on disk pressure usableSpace.set(0); // Wait for the next background task run to truncate the file - final long expectedTruncatedSize = c2.getOffset(); + final long expectedTruncatedSize = largeClaim.getOffset(); while (Files.size(filePath) != expectedTruncatedSize) { Thread.sleep(100L); } - // Verify C1's data is still readable - try (final InputStream in = localRepo.read(c1)) { + // Verify the small claim's data is still readable + try (final InputStream in = localRepository.read(smallClaim)) { assertNotNull(in); } } finally { - localRepo.shutdown(); + localRepository.shutdown(); } } private byte[] readFully(final InputStream inStream, final int size) throws IOException { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(size); + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(size); int len; final byte[] buffer = new byte[size]; while ((len = inStream.read(buffer)) >= 0) { - baos.write(buffer, 0, len); + outputStream.write(buffer, 0, len); } - return baos.toByteArray(); + return outputStream.toByteArray(); } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 8dd97cd59757..3a6453e31242 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -932,59 +932,59 @@ private FlowFileRecord findRecoveredByOffset(final List recovere @Test public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue() throws IOException { - final RuntimeRepoContext ctx = createRuntimeRepoContext(); - final ResourceClaim rc = ctx.claimManager().newResourceClaim("container", "section", "1", false, false); - ctx.claimManager().incrementClaimantCount(rc); - ctx.claimManager().incrementClaimantCount(rc); // count = 2 so that after delete decrement it stays > 0 (not destructable) - final StandardContentClaim contentClaim = createClaim(rc, 1024L, 5_000_000L, true); + final RuntimeRepoContext context = createRuntimeRepoContext(); + final ResourceClaim resourceClaim = context.claimManager().newResourceClaim("container", "section", "1", false, false); + context.claimManager().incrementClaimantCount(resourceClaim); + context.claimManager().incrementClaimantCount(resourceClaim); // count = 2 so that after delete decrement it stays > 0 (not destructable) + final StandardContentClaim contentClaim = createClaim(resourceClaim, 1024L, 5_000_000L, true); try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(niFiProperties)) { - repo.initialize(ctx.claimManager()); - repo.loadFlowFiles(ctx.queueProvider()); - createAndDeleteFlowFile(repo, ctx.queue(), contentClaim); + repo.initialize(context.claimManager()); + repo.loadFlowFiles(context.queueProvider()); + createAndDeleteFlowFile(repo, context.queue(), contentClaim); repo.checkpoint(); } final List truncated = new ArrayList<>(); - ctx.claimManager().drainTruncatableClaims(truncated, 100); - assertTrue(truncated.contains(contentClaim), "Truncatable claim should have been routed to the truncation queue"); + context.claimManager().drainTruncatableClaims(truncated, 100); + assertTrue(truncated.contains(contentClaim)); } @Test public void testDestructableClaimTakesPriorityOverTruncatable() throws IOException { - final RuntimeRepoContext ctx = createRuntimeRepoContext(); - final ResourceClaim rc = ctx.claimManager().newResourceClaim("container", "section", "1", false, false); - ctx.claimManager().incrementClaimantCount(rc); // count = 1 -- will reach 0 after delete - final StandardContentClaim contentClaim = createClaim(rc, 1024L, 5_000_000L, true); + final RuntimeRepoContext context = createRuntimeRepoContext(); + final ResourceClaim resourceClaim = context.claimManager().newResourceClaim("container", "section", "1", false, false); + context.claimManager().incrementClaimantCount(resourceClaim); // count = 1 -- will reach 0 after delete + final StandardContentClaim contentClaim = createClaim(resourceClaim, 1024L, 5_000_000L, true); try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(niFiProperties)) { - repo.initialize(ctx.claimManager()); - repo.loadFlowFiles(ctx.queueProvider()); - createAndDeleteFlowFile(repo, ctx.queue(), contentClaim); + repo.initialize(context.claimManager()); + repo.loadFlowFiles(context.queueProvider()); + createAndDeleteFlowFile(repo, context.queue(), contentClaim); repo.checkpoint(); } final List destructed = new ArrayList<>(); - ctx.claimManager().drainDestructableClaims(destructed, 100); - assertTrue(destructed.contains(rc), "Resource claim should be destructable"); + context.claimManager().drainDestructableClaims(destructed, 100); + assertTrue(destructed.contains(resourceClaim)); final List truncated = new ArrayList<>(); - ctx.claimManager().drainTruncatableClaims(truncated, 100); - assertFalse(truncated.contains(contentClaim), "Truncatable claim should NOT be in truncation queue when resource claim is destructable"); + context.claimManager().drainTruncatableClaims(truncated, 100); + assertFalse(truncated.contains(contentClaim)); } @Test public void testUpdateRecordOriginalClaimQueuedForTruncation() throws IOException { - final RuntimeRepoContext ctx = createRuntimeRepoContext(); + final RuntimeRepoContext context = createRuntimeRepoContext(); - final ResourceClaim rc1 = ctx.claimManager().newResourceClaim("container", "section", "1", false, false); - ctx.claimManager().incrementClaimantCount(rc1); - ctx.claimManager().incrementClaimantCount(rc1); // count = 2 so it stays > 0 after decrement - final StandardContentClaim originalClaim = createClaim(rc1, 2048L, 5_000_000L, true); + final ResourceClaim originalResourceClaim = context.claimManager().newResourceClaim("container", "section", "1", false, false); + context.claimManager().incrementClaimantCount(originalResourceClaim); + context.claimManager().incrementClaimantCount(originalResourceClaim); // count = 2 so it stays > 0 after decrement + final StandardContentClaim originalClaim = createClaim(originalResourceClaim, 2048L, 5_000_000L, true); - final ResourceClaim rc2 = ctx.claimManager().newResourceClaim("container", "section", "2", false, false); - ctx.claimManager().incrementClaimantCount(rc2); - final StandardContentClaim newClaim = createClaim(rc2, 0L, 100L, false); + final ResourceClaim newResourceClaim = context.claimManager().newResourceClaim("container", "section", "2", false, false); + context.claimManager().incrementClaimantCount(newResourceClaim); + final StandardContentClaim newClaim = createClaim(newResourceClaim, 0L, 100L, false); final FlowFileRecord originalFlowFile = new StandardFlowFileRecord.Builder() .id(1L) @@ -993,28 +993,28 @@ public void testUpdateRecordOriginalClaimQueuedForTruncation() throws IOExceptio .build(); try (final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(niFiProperties)) { - repo.initialize(ctx.claimManager()); - repo.loadFlowFiles(ctx.queueProvider()); + repo.initialize(context.claimManager()); + repo.loadFlowFiles(context.queueProvider()); - final StandardRepositoryRecord createRecord = new StandardRepositoryRecord(ctx.queue()); + final StandardRepositoryRecord createRecord = new StandardRepositoryRecord(context.queue()); createRecord.setWorking(originalFlowFile, false); - createRecord.setDestination(ctx.queue()); + createRecord.setDestination(context.queue()); repo.updateRepository(List.of(createRecord)); final FlowFileRecord updatedFlowFile = new StandardFlowFileRecord.Builder() .fromFlowFile(originalFlowFile) .contentClaim(newClaim) .build(); - final StandardRepositoryRecord updateRecord = new StandardRepositoryRecord(ctx.queue(), originalFlowFile); + final StandardRepositoryRecord updateRecord = new StandardRepositoryRecord(context.queue(), originalFlowFile); updateRecord.setWorking(updatedFlowFile, true); - updateRecord.setDestination(ctx.queue()); + updateRecord.setDestination(context.queue()); repo.updateRepository(List.of(updateRecord)); repo.checkpoint(); } final List truncated = new ArrayList<>(); - ctx.claimManager().drainTruncatableClaims(truncated, 100); - assertTrue(truncated.contains(originalClaim), "Original claim should have been queued for truncation after content change"); + context.claimManager().drainTruncatableClaims(truncated, 100); + assertTrue(truncated.contains(originalClaim)); } // ========================================================================= @@ -1024,36 +1024,33 @@ public void testUpdateRecordOriginalClaimQueuedForTruncation() throws IOExceptio @Test public void testRecoveryMarksTruncationCandidateForLargeTailClaim() throws IOException { final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); - final ResourceClaim rc = claimManager.newResourceClaim("container", "section", "1", false, false); - final StandardContentClaim smallClaim = createClaim(rc, 0L, 100L, false); - final StandardContentClaim largeClaim = createClaim(rc, 100L, 2_000_000L, false); + final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1", false, false); + final StandardContentClaim smallClaim = createClaim(resourceClaim, 0L, 100L, false); + final StandardContentClaim largeClaim = createClaim(resourceClaim, 100L, 2_000_000L, false); final List recovered = writeAndRecover(smallClaim, largeClaim); - final FlowFileRecord recoveredLargeFF = findRecoveredByOffset(recovered, 100L); - assertNotNull(recoveredLargeFF, "Should have recovered a FlowFile with the large claim"); - assertTrue(recoveredLargeFF.getContentClaim().isTruncationCandidate(), - "Large tail claim should be marked as truncation candidate after recovery"); + final FlowFileRecord recoveredLargeFlowFile = findRecoveredByOffset(recovered, 100L); + assertNotNull(recoveredLargeFlowFile); + assertTrue(recoveredLargeFlowFile.getContentClaim().isTruncationCandidate()); - final FlowFileRecord recoveredSmallFF = findRecoveredByOffset(recovered, 0L); - assertNotNull(recoveredSmallFF, "Should have recovered a FlowFile with the small claim"); - assertFalse(recoveredSmallFF.getContentClaim().isTruncationCandidate(), - "Small claim at offset 0 should NOT be a truncation candidate after recovery"); + final FlowFileRecord recoveredSmallFlowFile = findRecoveredByOffset(recovered, 0L); + assertNotNull(recoveredSmallFlowFile); + assertFalse(recoveredSmallFlowFile.getContentClaim().isTruncationCandidate()); } @Test public void testRecoveryDoesNotMarkClonedClaim() throws IOException { final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); - final ResourceClaim rc = claimManager.newResourceClaim("container", "section", "1", false, false); - final StandardContentClaim sharedClaim = createClaim(rc, 100L, 2_000_000L, false); + final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1", false, false); + final StandardContentClaim sharedClaim = createClaim(resourceClaim, 100L, 2_000_000L, false); // Two FlowFiles sharing the same claim (clone scenario) final List recovered = writeAndRecover(sharedClaim, sharedClaim); - for (final FlowFileRecord ff : recovered) { - if (ff.getContentClaim() != null) { - assertFalse(ff.getContentClaim().isTruncationCandidate(), - "Cloned/shared claim should NOT be a truncation candidate"); + for (final FlowFileRecord flowFile : recovered) { + if (flowFile.getContentClaim() != null) { + assertFalse(flowFile.getContentClaim().isTruncationCandidate()); } } } @@ -1061,38 +1058,39 @@ public void testRecoveryDoesNotMarkClonedClaim() throws IOException { @Test public void testRecoveryOnlyMarksTailClaim() throws IOException { final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); - final ResourceClaim rc = claimManager.newResourceClaim("container", "section", "1", false, false); - final StandardContentClaim claim1 = createClaim(rc, 100L, 2_000_000L, false); - final StandardContentClaim claim2 = createClaim(rc, 2_000_100L, 3_000_000L, false); + final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1", false, false); + final StandardContentClaim nonTailClaim = createClaim(resourceClaim, 100L, 2_000_000L, false); + final StandardContentClaim tailClaim = createClaim(resourceClaim, 2_000_100L, 3_000_000L, false); - final List recovered = writeAndRecover(claim1, claim2); + final List recovered = writeAndRecover(nonTailClaim, tailClaim); - final FlowFileRecord tailFF = findRecoveredByOffset(recovered, 2_000_100L); - assertNotNull(tailFF, "Should have recovered the tail claim FlowFile"); - assertTrue(tailFF.getContentClaim().isTruncationCandidate(), - "Only the tail claim should be a truncation candidate"); + final FlowFileRecord tailFlowFile = findRecoveredByOffset(recovered, 2_000_100L); + assertNotNull(tailFlowFile); + assertTrue(tailFlowFile.getContentClaim().isTruncationCandidate()); - final FlowFileRecord nonTailFF = findRecoveredByOffset(recovered, 100L); - assertNotNull(nonTailFF, "Should have recovered the non-tail claim FlowFile"); - assertFalse(nonTailFF.getContentClaim().isTruncationCandidate(), - "Non-tail large claim should NOT be a truncation candidate"); + final FlowFileRecord nonTailFlowFile = findRecoveredByOffset(recovered, 100L); + assertNotNull(nonTailFlowFile); + assertFalse(nonTailFlowFile.getContentClaim().isTruncationCandidate()); } @Test public void testRecoverySmallClaimAfterLargeDoesNotMarkLarge() throws IOException { final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); - final ResourceClaim rc = claimManager.newResourceClaim("container", "section", "1", false, false); - final StandardContentClaim smallClaim1 = createClaim(rc, 0L, 100L, false); - final StandardContentClaim largeClaim = createClaim(rc, 100L, 2_000_000L, false); - final StandardContentClaim smallClaim2 = createClaim(rc, 2_000_100L, 50L, false); + final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1", false, false); + final StandardContentClaim firstSmallClaim = createClaim(resourceClaim, 0L, 100L, false); + final StandardContentClaim largeClaim = createClaim(resourceClaim, 100L, 2_000_000L, false); + final StandardContentClaim secondSmallClaim = createClaim(resourceClaim, 2_000_100L, 50L, false); - final List recovered = writeAndRecover(smallClaim1, largeClaim, smallClaim2); + final List recovered = writeAndRecover(firstSmallClaim, largeClaim, secondSmallClaim); - for (final FlowFileRecord ff : recovered) { - if (ff.getContentClaim() != null) { - assertFalse(ff.getContentClaim().isTruncationCandidate(), - "No claim should be a truncation candidate because the large claim is not the tail; claim offset=" + ff.getContentClaim().getOffset()); - } + final List flowFilesWithClaims = recovered.stream() + .filter(flowFile -> flowFile.getContentClaim() != null) + .toList(); + + assertFalse(flowFilesWithClaims.isEmpty()); + for (final FlowFileRecord flowFile : flowFilesWithClaims) { + assertFalse(flowFile.getContentClaim().isTruncationCandidate(), + "No claim should be a truncation candidate because the large claim is not the tail; claim offset=" + flowFile.getContentClaim().getOffset()); } } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java index 31b3f2d619e9..1e483fc25f38 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java +++ b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java @@ -177,7 +177,7 @@ public void markTruncatable(final ContentClaim contentClaim) { logger.debug("Marking {} as truncatable", contentClaim); try { if (!truncatableClaims.offer(contentClaim, 1, TimeUnit.MINUTES)) { - logger.debug("Unable to mark {} as truncatable because the queue is full.", contentClaim); + logger.info("Unable to mark {} as truncatable because maximum queue size [{}] reached", truncatableClaims.size(), contentClaim); } } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java index 6e3648f5aca2..c00ba93c6cea 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java @@ -36,8 +36,9 @@ public class GenerateTruncatableFlowFiles extends AbstractProcessor { static final PropertyDescriptor BATCH_COUNT = new PropertyDescriptor.Builder() .name("Batch Count") - .description("The maximum number of batches to generate. Each batch produces 10 FlowFiles (9 small + 1 large). " - + "Once this many batches have been generated, no more FlowFiles will be produced until the processor is stopped and restarted.") + .description(""" + The maximum number of batches to generate. Each batch produces 10 FlowFiles (9 small + 1 large). \ + Once this many batches have been generated, no more FlowFiles will be produced until the processor is stopped and restarted.""") .required(true) .defaultValue("10") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)