-
Notifications
You must be signed in to change notification settings - Fork 2.9k
NIFI-15570: Keep track of Content Claims where the last Claim in a Re… #10874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,9 @@ | |
| import org.apache.nifi.controller.repository.claim.ContentClaim; | ||
| import org.apache.nifi.controller.repository.claim.ResourceClaim; | ||
| import org.apache.nifi.controller.repository.claim.ResourceClaimManager; | ||
| import org.apache.nifi.controller.repository.claim.StandardContentClaim; | ||
| import org.apache.nifi.flowfile.attributes.CoreAttributes; | ||
| import org.apache.nifi.processor.DataUnit; | ||
| import org.apache.nifi.repository.schema.FieldCache; | ||
| import org.apache.nifi.util.FormatUtils; | ||
| import org.apache.nifi.util.NiFiProperties; | ||
|
|
@@ -98,6 +100,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis | |
| private final List<File> flowFileRepositoryPaths = new ArrayList<>(); | ||
| private final ScheduledExecutorService checkpointExecutor; | ||
| private final int maxCharactersToCache; | ||
| private final long truncationThreshold; | ||
|
|
||
| private volatile Collection<SerializedRepositoryRecord> recoveredRecords = null; | ||
| private final Set<ResourceClaim> orphanedResourceClaims = Collections.synchronizedSet(new HashSet<>()); | ||
|
|
@@ -132,6 +135,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis | |
| // before the data is destroyed, it's okay because the data will be unknown to the Content Repository, so it will be destroyed | ||
| // on restart. | ||
| private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>> claimsAwaitingDestruction = new ConcurrentHashMap<>(); | ||
| private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>> claimsAwaitingTruncation = new ConcurrentHashMap<>(); | ||
|
|
||
| /** | ||
| * default no args constructor for service loading only. | ||
|
|
@@ -143,6 +147,7 @@ public WriteAheadFlowFileRepository() { | |
| nifiProperties = null; | ||
| retainOrphanedFlowFiles = true; | ||
| maxCharactersToCache = 0; | ||
| truncationThreshold = Long.MAX_VALUE; | ||
| } | ||
|
|
||
| public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) { | ||
|
|
@@ -153,6 +158,10 @@ public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) { | |
| retainOrphanedFlowFiles = orphanedFlowFileProperty == null || Boolean.parseBoolean(orphanedFlowFileProperty); | ||
|
|
||
| this.maxCharactersToCache = nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE); | ||
| final long maxAppendableClaimLength = DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), DataUnit.B).longValue(); | ||
| // Cap the truncation threshold at 1 MB so that claims larger than 1 MB are always eligible | ||
| // for truncation regardless of how large maxAppendableClaimSize is configured. | ||
| truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength); | ||
|
|
||
| final String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX); | ||
| flowFileRepositoryPaths.add(new File(directoryName)); | ||
|
|
@@ -445,33 +454,48 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord | |
| // The below code is not entirely thread-safe, but we are OK with that because the results aren't really harmful. | ||
| // Specifically, if two different threads call updateRepository with DELETE records for the same Content Claim, | ||
| // it's quite possible for claimant count to be 0 below, which results in two different threads adding the Content | ||
| // Claim to the 'claimsAwaitDestruction' map. As a result, we can call #markDestructable with the same ContentClaim | ||
| // Claim to the 'claimsAwaitingDestruction' map. As a result, we can call #markDestructable with the same ContentClaim | ||
| // multiple times, and the #markDestructable method is not necessarily idempotent. | ||
| // However, the result of this is that the FileSystem Repository may end up trying to remove the content multiple times. | ||
| // This does not, however, cause problems, as ContentRepository should handle this | ||
| // This does indicate that some refactoring should probably be performed, though, as this is not a very clean interface. | ||
| final Set<ResourceClaim> claimsToAdd = new HashSet<>(); | ||
| final Set<ResourceClaim> destructableClaims = new HashSet<>(); | ||
| final Set<ContentClaim> truncatableClaims = new HashSet<>(); | ||
|
|
||
| final Set<String> swapLocationsAdded = new HashSet<>(); | ||
| final Set<String> swapLocationsRemoved = new HashSet<>(); | ||
|
|
||
| for (final RepositoryRecord record : repositoryRecords) { | ||
| updateClaimCounts(record); | ||
|
|
||
| final ContentClaim contentClaim = record.getCurrentClaim(); | ||
| final boolean truncationCandidate = contentClaim != null && contentClaim.isTruncationCandidate(); | ||
| final boolean claimChanged = !Objects.equals(record.getOriginalClaim(), contentClaim); | ||
| if (record.getType() == RepositoryRecordType.DELETE) { | ||
| // For any DELETE record that we have, if claim is destructible, mark it so | ||
| if (record.getCurrentClaim() != null && isDestructable(record.getCurrentClaim())) { | ||
| claimsToAdd.add(record.getCurrentClaim().getResourceClaim()); | ||
| // For any DELETE record that we have, if claim is destructible or truncatable, mark it so | ||
| if (isDestructable(contentClaim)) { | ||
| destructableClaims.add(contentClaim.getResourceClaim()); | ||
| } else if (truncationCandidate) { | ||
| truncatableClaims.add(contentClaim); | ||
| } | ||
|
|
||
| // If the original claim is different than the current claim and the original claim is destructible, mark it so | ||
| if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getCurrentClaim()) && isDestructable(record.getOriginalClaim())) { | ||
| claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); | ||
| // If the original claim is different than the current claim and the original claim is destructible | ||
| // or truncatable, mark it so | ||
| if (claimChanged) { | ||
| if (isDestructable(record.getOriginalClaim())) { | ||
| destructableClaims.add(record.getOriginalClaim().getResourceClaim()); | ||
| } else if (record.getOriginalClaim() != null && record.getOriginalClaim().isTruncationCandidate()) { | ||
| truncatableClaims.add(record.getOriginalClaim()); | ||
| } | ||
| } | ||
| } else if (record.getType() == RepositoryRecordType.UPDATE) { | ||
| // if we have an update, and the original is no longer needed, mark original as destructible | ||
| if (record.getOriginalClaim() != null && record.getCurrentClaim() != record.getOriginalClaim() && isDestructable(record.getOriginalClaim())) { | ||
| claimsToAdd.add(record.getOriginalClaim().getResourceClaim()); | ||
| if (claimChanged) { | ||
| if (isDestructable(record.getOriginalClaim())) { | ||
| destructableClaims.add(record.getOriginalClaim().getResourceClaim()); | ||
| } else if (record.getOriginalClaim() != null && record.getOriginalClaim().isTruncationCandidate()) { | ||
| truncatableClaims.add(record.getOriginalClaim()); | ||
| } | ||
| } | ||
| } else if (record.getType() == RepositoryRecordType.SWAP_OUT) { | ||
| final String swapLocation = record.getSwapLocation(); | ||
|
|
@@ -484,13 +508,16 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord | |
| } | ||
| } | ||
|
|
||
| // Once the content claim counts have been updated for all records, collect any transient claims that are eligible for destruction | ||
| // Once the content claim counts have been updated for all records, collect any transient | ||
| // claims that are eligible for destruction or truncation | ||
| for (final RepositoryRecord record : repositoryRecords) { | ||
| final List<ContentClaim> transientClaims = record.getTransientClaims(); | ||
| if (transientClaims != null) { | ||
| for (final ContentClaim transientClaim : transientClaims) { | ||
| if (isDestructable(transientClaim)) { | ||
| claimsToAdd.add(transientClaim.getResourceClaim()); | ||
| destructableClaims.add(transientClaim.getResourceClaim()); | ||
| } else if (transientClaim.isTruncationCandidate()) { | ||
| truncatableClaims.add(transientClaim); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -504,19 +531,15 @@ protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecord | |
| } | ||
| } | ||
|
|
||
| if (!claimsToAdd.isEmpty()) { | ||
| // Get / Register a Set<ContentClaim> for the given Partition Index | ||
| final Integer partitionKey = Integer.valueOf(partitionIndex); | ||
| BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(partitionKey); | ||
| if (claimQueue == null) { | ||
| claimQueue = new LinkedBlockingQueue<>(); | ||
| final BlockingQueue<ResourceClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue); | ||
| if (existingClaimQueue != null) { | ||
| claimQueue = existingClaimQueue; | ||
| } | ||
| } | ||
| if (!destructableClaims.isEmpty()) { | ||
| // Get / Register a Set<ResourceClaim> for the given Partition Index | ||
| final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.computeIfAbsent(partitionIndex, key -> new LinkedBlockingQueue<>()); | ||
| claimQueue.addAll(destructableClaims); | ||
| } | ||
|
|
||
| claimQueue.addAll(claimsToAdd); | ||
| if (!truncatableClaims.isEmpty()) { | ||
| final BlockingQueue<ContentClaim> claimQueue = claimsAwaitingTruncation.computeIfAbsent(partitionIndex, key -> new LinkedBlockingQueue<>()); | ||
| claimQueue.addAll(truncatableClaims); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -566,16 +589,24 @@ private static String getLocationSuffix(final String swapLocation) { | |
|
|
||
| @Override | ||
| public void onSync(final int partitionIndex) { | ||
| final BlockingQueue<ResourceClaim> claimQueue = claimsAwaitingDestruction.get(partitionIndex); | ||
| if (claimQueue == null) { | ||
| return; | ||
| final BlockingQueue<ResourceClaim> destructionClaimQueue = claimsAwaitingDestruction.get(partitionIndex); | ||
| if (destructionClaimQueue != null) { | ||
| final Set<ResourceClaim> claimsToDestroy = new HashSet<>(); | ||
| destructionClaimQueue.drainTo(claimsToDestroy); | ||
|
|
||
| for (final ResourceClaim claim : claimsToDestroy) { | ||
| markDestructable(claim); | ||
| } | ||
| } | ||
|
|
||
| final Set<ResourceClaim> claimsToDestroy = new HashSet<>(); | ||
| claimQueue.drainTo(claimsToDestroy); | ||
| final BlockingQueue<ContentClaim> truncationClaimQueue = claimsAwaitingTruncation.get(partitionIndex); | ||
| if (truncationClaimQueue != null) { | ||
| final Set<ContentClaim> claimsToTruncate = new HashSet<>(); | ||
| truncationClaimQueue.drainTo(claimsToTruncate); | ||
|
|
||
| for (final ResourceClaim claim : claimsToDestroy) { | ||
| markDestructable(claim); | ||
| for (final ContentClaim claim : claimsToTruncate) { | ||
| claimManager.markTruncatable(claim); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -589,6 +620,15 @@ public void onGlobalSync() { | |
| markDestructable(claim); | ||
| } | ||
| } | ||
|
|
||
| for (final BlockingQueue<ContentClaim> claimQueue : claimsAwaitingTruncation.values()) { | ||
| final Set<ContentClaim> claimsToTruncate = new HashSet<>(); | ||
| claimQueue.drainTo(claimsToTruncate); | ||
|
|
||
| for (final ContentClaim claim : claimsToTruncate) { | ||
| claimManager.markTruncatable(claim); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -723,6 +763,10 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException | |
| queueMap.put(queue.getIdentifier(), queue); | ||
| } | ||
|
|
||
| final Set<StandardContentClaim> truncationEligibleClaims = new HashSet<>(); | ||
| final Set<ContentClaim> forbiddenTruncationClaims = new HashSet<>(); | ||
| final Map<ResourceClaim, ContentClaim> latestContentClaimByResourceClaim = new HashMap<>(); | ||
|
|
||
| final List<SerializedRepositoryRecord> dropRecords = new ArrayList<>(); | ||
| int numFlowFilesMissingQueue = 0; | ||
| long maxId = 0; | ||
|
|
@@ -748,6 +792,15 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException | |
| } | ||
|
|
||
| final ContentClaim claim = record.getContentClaim(); | ||
|
|
||
| // Track the latest Content Claim for each Resource Claim so that we can determine which claims are eligible for truncation. | ||
| if (claim != null) { | ||
| final ContentClaim latestContentClaim = latestContentClaimByResourceClaim.get(claim.getResourceClaim()); | ||
| if (latestContentClaim == null || claim.getOffset() > latestContentClaim.getOffset()) { | ||
| latestContentClaimByResourceClaim.put(claim.getResourceClaim(), claim); | ||
| } | ||
| } | ||
|
|
||
| final FlowFileQueue flowFileQueue = queueMap.get(queueId); | ||
| final boolean orphaned = flowFileQueue == null; | ||
| if (orphaned) { | ||
|
|
@@ -777,6 +830,18 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException | |
|
|
||
| continue; | ||
| } else if (claim != null) { | ||
| // If the claim exceeds the max appendable claim length on its own and doesn't start the Resource Claim, | ||
| // we will consider it to be eligible for truncation. However, if there are multiple FlowFiles sharing the | ||
| // same claim, we cannot truncate it because doing so would affect the other FlowFiles. | ||
| if (claim.getOffset() > 0 && claim.getLength() > truncationThreshold && claim instanceof final StandardContentClaim scc) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like the claim instance type should be checked first, or is this done after for a specific reason? On the other hand, could this ever be something other than the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should never be anything other than |
||
| if (forbiddenTruncationClaims.contains(claim) || truncationEligibleClaims.contains(scc)) { | ||
| truncationEligibleClaims.remove(scc); | ||
| forbiddenTruncationClaims.add(scc); | ||
| } else { | ||
| truncationEligibleClaims.add(scc); | ||
| } | ||
| } | ||
|
|
||
| claimManager.incrementClaimantCount(claim.getResourceClaim()); | ||
| } | ||
|
|
||
|
|
@@ -786,6 +851,14 @@ public long loadFlowFiles(final QueueProvider queueProvider) throws IOException | |
| // If recoveredRecords has been populated it need to be nulled out now because it is no longer useful and can be garbage collected. | ||
| recoveredRecords = null; | ||
|
|
||
| // If any Content Claim was determined to be truncatable, mark it as such now. | ||
| for (final StandardContentClaim eligible : truncationEligibleClaims) { | ||
| final ContentClaim latestForResource = latestContentClaimByResourceClaim.get(eligible.getResourceClaim()); | ||
| if (Objects.equals(eligible, latestForResource)) { | ||
| eligible.setTruncationCandidate(true); | ||
| } | ||
| } | ||
|
|
||
| // Set the AtomicLong to 1 more than the max ID so that calls to #getNextFlowFileSequence() will | ||
| // return the appropriate number. | ||
| flowFileSequenceGenerator.set(maxId + 1); | ||
|
|
@@ -852,7 +925,7 @@ public long getNextFlowFileSequence() { | |
| } | ||
|
|
||
| @Override | ||
| public long getMaxFlowFileIdentifier() throws IOException { | ||
| public long getMaxFlowFileIdentifier() { | ||
| // flowFileSequenceGenerator is 1 more than the MAX so that we can call #getAndIncrement on the AtomicLong | ||
| return flowFileSequenceGenerator.get() - 1; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful to comment on the reason for the minimum value selected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is just a "magic number" ... if someone goes in and sets Max Appendable to something large like 10 MB, we still want to consider truncating smaller values. 1 MB seems reasonable.