diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java index 4d9e6bbeaf97..1207d27b7092 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/flow/synchronization/StandardVersionedComponentSynchronizer.java @@ -319,7 +319,7 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", versionedExternalFlow.getFlowContents()); final PropertyDecryptor decryptor = options.getPropertyDecryptor(); - final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, group.getAncestorServiceIds(), + final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, new StaticDifferenceDescriptor(), decryptor::decrypt, options.getComponentComparisonIdLookup(), FlowComparatorVersionedStrategy.DEEP); final FlowComparison flowComparison = flowComparator.compare(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 1e6deb27ae08..7cbf14cfc95c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -64,9 +64,13 @@ import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.encrypt.PropertyEncryptor; import org.apache.nifi.flow.ExecutionEngine; +import org.apache.nifi.flow.ExternalControllerServiceReference; import org.apache.nifi.flow.VersionedComponent; +import org.apache.nifi.flow.VersionedControllerService; import org.apache.nifi.flow.VersionedExternalFlow; import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.flow.VersionedPropertyDescriptor; import org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer; import org.apache.nifi.flow.synchronization.VersionedFlowSynchronizationContext; import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods; @@ -2633,6 +2637,13 @@ public void removeControllerService(final ControllerServiceNode service) { } }); + // When an ancestor controller service is removed, any descendant versioned PG whose + // committed snapshot referenced that service needs its cached differences invalidated, + // even if no component currently references the deleted service (e.g., the processor + // was already switched to a different service before the old one was deleted). + findAllProcessGroups(pg -> pg.getVersionControlInformation() != null) + .forEach(ProcessGroup::onComponentModified); + scheduler.submitFrameworkTask(() -> stateManagerProvider.onComponentRemoved(service.getIdentifier())); removed = true; @@ -3758,6 +3769,7 @@ public void synchronizeWithFlowRegistry(final FlowManager flowManager) { final FlowSnapshotContainer registrySnapshotContainer = flowRegistry.getFlowContents( FlowRegistryClientContextFactory.getAnonymousContext(), flowVersionLocation, false); final RegisteredFlowSnapshot registrySnapshot = registrySnapshotContainer.getFlowSnapshot(); + resolveExternalServiceReferences(registrySnapshot); final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents(); vci.setFlowSnapshot(registryFlow); } catch (final IOException | FlowRegistryException e) { @@ -3911,26 +3923,6 @@ public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final } } - @Override - public Set getAncestorServiceIds() { - final Set ancestorServiceIds; - ProcessGroup parentGroup = getParent(); - - if (parentGroup == null) { - ancestorServiceIds = Collections.emptySet(); - } else { - // We want to map the Controller Service to its Versioned Component ID, if it has one. - // If it does not have one, we want to generate it in the same way that our Flow Mapper does - // because this allows us to find the Controller Service when doing a Flow Diff. - ancestorServiceIds = parentGroup.getControllerServices(true).stream() - .map(cs -> cs.getVersionedComponentId().orElse( - NiFiRegistryFlowMapper.generateVersionedComponentId(cs.getIdentifier()))) - .collect(Collectors.toSet()); - } - - return ancestorServiceIds; - } - private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) { long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits(); @@ -3947,6 +3939,81 @@ private String generateUuid(final String propposedId, final String destinationGr return uuid.toString(); } + private void resolveExternalServiceReferences(final RegisteredFlowSnapshot snapshot) { + final Map externalRefs = snapshot.getExternalControllerServices(); + if (externalRefs == null || externalRefs.isEmpty()) { + return; + } + + final ProcessGroup parentGroup = getParent(); + if (parentGroup == null) { + return; + } + + final Map serviceNameToVersionedId = new HashMap<>(); + for (final ControllerServiceNode serviceNode : parentGroup.getControllerServices(true)) { + final String versionedId = serviceNode.getVersionedComponentId().orElse( + NiFiRegistryFlowMapper.generateVersionedComponentId(serviceNode.getIdentifier())); + serviceNameToVersionedId.put(serviceNode.getName(), versionedId); + } + + final Map foreignToLocalId = new HashMap<>(); + for (final Map.Entry entry : externalRefs.entrySet()) { + final String foreignId = entry.getKey(); + final String serviceName = entry.getValue().getName(); + final String localId = serviceNameToVersionedId.get(serviceName); + if (localId != null && !localId.equals(foreignId)) { + foreignToLocalId.put(foreignId, localId); + } + } + + if (!foreignToLocalId.isEmpty()) { + replaceExternalServiceIds(snapshot.getFlowContents(), foreignToLocalId); + } + } + + private void replaceExternalServiceIds(final VersionedProcessGroup group, final Map foreignToLocalId) { + if (group.getProcessors() != null) { + for (final VersionedProcessor processor : group.getProcessors()) { + replaceServicePropertyIds(processor.getProperties(), processor.getPropertyDescriptors(), foreignToLocalId); + } + } + + if (group.getControllerServices() != null) { + for (final VersionedControllerService service : group.getControllerServices()) { + replaceServicePropertyIds(service.getProperties(), service.getPropertyDescriptors(), foreignToLocalId); + } + } + + if (group.getProcessGroups() != null) { + for (final VersionedProcessGroup child : group.getProcessGroups()) { + replaceExternalServiceIds(child, foreignToLocalId); + } + } + } + + private void replaceServicePropertyIds(final Map properties, final Map descriptors, + final Map foreignToLocalId) { + if (properties == null || descriptors == null) { + return; + } + + for (final Map.Entry entry : properties.entrySet()) { + final String propertyValue = entry.getValue(); + if (propertyValue == null) { + continue; + } + + final VersionedPropertyDescriptor descriptor = descriptors.get(entry.getKey()); + if (descriptor != null && descriptor.getIdentifiesControllerService()) { + final String localId = foreignToLocalId.get(propertyValue); + if (localId != null) { + entry.setValue(localId); + } + } + } + } + private Set getModifications() { final StandardVersionControlInformation vci = versionControlInfo.get(); @@ -3975,7 +4042,7 @@ private Set getModifications() { final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup); final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot()); - final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(), + final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, new EvolvingDifferenceDescriptor(), encryptor::decrypt, VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); final FlowComparison comparison = flowComparator.compare(); final Collection comparisonDifferences = comparison.getDifferences(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index cc53fe961dea..13b101501605 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -596,13 +596,6 @@ default CompletableFuture stopComponents() { */ Funnel findFunnel(String id); - /** - * Gets a collection of identifiers representing all ancestor controller services - * - * @return collection of ancestor controller service identifiers - */ - Set getAncestorServiceIds(); - /** * @param id of the Controller Service * @param includeDescendantGroups whether or not to include descendant process groups diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java index e0a7ce607fd4..95e1006b03d7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java @@ -504,7 +504,7 @@ private FlowComparison compareFlows(final DataFlow existingFlow, final DataFlow toSet(existingVersionedFlow.getRegistries()) ); - final FlowComparator flowComparator = new StandardFlowComparator(localDataFlow, clusterDataFlow, Collections.emptySet(), + final FlowComparator flowComparator = new StandardFlowComparator(localDataFlow, clusterDataFlow, differenceDescriptor, encryptor::decrypt, VersionedComponent::getInstanceIdentifier, FlowComparatorVersionedStrategy.DEEP); return flowComparator.compare(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index cfd0eeefe552..ccfccb822687 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -438,11 +438,6 @@ public Funnel findFunnel(final String id) { return null; } - @Override - public Set getAncestorServiceIds() { - return null; - } - @Override public ControllerServiceNode findControllerService(final String id, final boolean includeDescendants, final boolean includeAncestors) { return serviceMap.get(id); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 940310d3e8c0..dea87d3b7254 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -5546,7 +5546,6 @@ public FlowComparisonEntity getVersionDifference(final String registryId, FlowVe final FlowComparator flowComparator = new StandardFlowComparator( new StandardComparableDataFlow("Flow A", flowContentsA), new StandardComparableDataFlow("Flow B", flowContentsB), - Collections.emptySet(), // Replacement of an external ControllerService is recognized as property change new ConciseEvolvingDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, @@ -5626,6 +5625,12 @@ public FlowComparisonEntity getLocalModifications(final String processGroupId) { versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion()); final FlowSnapshotContainer flowSnapshotContainer = flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), flowVersionLocation, true); + + // Resolve external controller service references by name so that cross-instance + // ID differences do not appear as phantom local modifications. + final String parentGroupId = processGroup.getParent() == null ? processGroup.getIdentifier() : processGroup.getParent().getIdentifier(); + controllerFacade.getControllerServiceResolver().resolveInheritedControllerServices(flowSnapshotContainer, parentGroupId, NiFiUserUtils.getNiFiUser()); + final RegisteredFlowSnapshot versionedFlowSnapshot = flowSnapshotContainer.getFlowSnapshot(); registryGroup = versionedFlowSnapshot.getFlowContents(); } catch (final IOException | FlowRegistryException e) { @@ -5638,8 +5643,7 @@ public FlowComparisonEntity getLocalModifications(final String processGroupId) { final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup); final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup); - final Set ancestorServiceIds = processGroup.getAncestorServiceIds(); - final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor(), + final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); final FlowComparison flowComparison = flowComparator.compare(); @@ -5802,8 +5806,7 @@ public Set getComponentsAffectedByFlowUpdate(final Stri final ComparableDataFlow localFlow = new StandardComparableDataFlow("Current Flow", localContents); final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("New Flow", updatedSnapshot.getFlowContents()); - final Set ancestorServiceIds = group.getAncestorServiceIds(); - final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorServiceIds, new StaticDifferenceDescriptor(), + final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, new StaticDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.DEEP); final FlowComparison comparison = flowComparator.compare(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java index 5416a41a9f31..20e27fdef450 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java @@ -317,8 +317,6 @@ public void testGetComponentsAffectedByFlowUpdate_WithNewStatelessProcessGroup_R final String groupId = UUID.randomUUID().toString(); final ProcessGroup processGroup = mock(ProcessGroup.class); when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup); - when(processGroup.getAncestorServiceIds()).thenReturn(Collections.emptySet()); - final FlowManager flowManager = mock(FlowManager.class); final ExtensionManager extensionManager = mock(ExtensionManager.class); when(flowController.getFlowManager()).thenReturn(flowManager); @@ -352,7 +350,6 @@ public void testGetComponentsAffectedByFlowUpdate_WithNewStatelessProcessGroup_R final FlowComparator flowComparator = new StandardFlowComparator( localFlow, proposedFlow, - Collections.emptySet(), new StaticDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java index 875d595b456c..02902225a0c0 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java @@ -70,18 +70,16 @@ public class StandardFlowComparator implements FlowComparator { private final ComparableDataFlow flowA; private final ComparableDataFlow flowB; - private final Set externallyAccessibleServiceIds; private final DifferenceDescriptor differenceDescriptor; private final Function propertyDecryptor; private final Function idLookup; private final FlowComparatorVersionedStrategy flowComparatorVersionedStrategy; - public StandardFlowComparator(final ComparableDataFlow flowA, final ComparableDataFlow flowB, final Set externallyAccessibleServiceIds, + public StandardFlowComparator(final ComparableDataFlow flowA, final ComparableDataFlow flowB, final DifferenceDescriptor differenceDescriptor, final Function propertyDecryptor, final Function idLookup, final FlowComparatorVersionedStrategy flowComparatorVersionedStrategy) { this.flowA = flowA; this.flowB = flowB; - this.externallyAccessibleServiceIds = externallyAccessibleServiceIds; this.differenceDescriptor = differenceDescriptor; this.propertyDecryptor = propertyDecryptor; this.idLookup = idLookup; @@ -427,21 +425,6 @@ private void compareProperties(final VersionedComponent componentA, final Versio differences.add(difference(DifferenceType.PROPERTY_REMOVED, componentA, componentB, key, displayName, valueA, valueB)); } } else if (valueA != null && !valueA.equals(valueB)) { - // If the property in Flow A references a Controller Service that is not available in the flow - // and the property in Flow B references a Controller Service that is available in its environment - // but not part of the Versioned Flow, then we do not want to consider this to be a Flow Difference. - // This is typically the case when a flow is versioned in one instance, referencing an external Controller Service, - // and then imported into another NiFi instance. When imported, the property does not point to any existing Controller - // Service, and the user must then point the property an existing Controller Service. We don't want to consider the - // flow as having changed, since it is an environment-specific change (similar to how we handle variables). - if (descriptor != null && descriptor.getIdentifiesControllerService()) { - final boolean accessibleA = externallyAccessibleServiceIds.contains(valueA); - final boolean accessibleB = externallyAccessibleServiceIds.contains(valueB); - if (!accessibleA && accessibleB) { - return; - } - } - final boolean aParameterized = isParameterReference(valueA); final boolean bParameterized = isParameterReference(valueB); if (aParameterized && !bParameterized) { diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestStandardFlowComparator.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestStandardFlowComparator.java index b068b43e780e..a14a7cb7fe96 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestStandardFlowComparator.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestStandardFlowComparator.java @@ -28,6 +28,7 @@ import org.apache.nifi.flow.VersionedParameterContext; import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.flow.VersionedPropertyDescriptor; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -63,7 +64,7 @@ public void setup() { final Function decryptor = encryptedToDecrypted::get; final ComparableDataFlow flowA = new StandardComparableDataFlow("Flow A", new VersionedProcessGroup()); final ComparableDataFlow flowB = new StandardComparableDataFlow("Flow B", new VersionedProcessGroup()); - comparator = new StandardFlowComparator(flowA, flowB, Collections.emptySet(), + comparator = new StandardFlowComparator(flowA, flowB, new StaticDifferenceDescriptor(), decryptor, VersionedComponent::getInstanceIdentifier, FlowComparatorVersionedStrategy.SHALLOW); } @@ -223,7 +224,7 @@ public void testDeepStrategyWithChildPGs() { // Testing when a child PG is added and the child PG contains components - comparator = new StandardFlowComparator(flowA, flowB, Collections.emptySet(), + comparator = new StandardFlowComparator(flowA, flowB, new StaticDifferenceDescriptor(), decryptor, VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); final Set diffShallowChildPgAdded = comparator.compare().getDifferences(); @@ -232,7 +233,7 @@ public void testDeepStrategyWithChildPGs() { .anyMatch(difference -> difference.getDifferenceType() == DifferenceType.COMPONENT_ADDED && difference.getComponentB().getComponentType() == ComponentType.PROCESS_GROUP)); - comparator = new StandardFlowComparator(flowA, flowB, Collections.emptySet(), + comparator = new StandardFlowComparator(flowA, flowB, new StaticDifferenceDescriptor(), decryptor, VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.DEEP); final Set diffDeepChildPgAdded = comparator.compare().getDifferences(); assertEquals(15, diffDeepChildPgAdded.size()); @@ -257,7 +258,7 @@ public void testDeepStrategyWithChildPGs() { // Testing when a child PG is removed and the child PG contains components - comparator = new StandardFlowComparator(flowB, flowA, Collections.emptySet(), + comparator = new StandardFlowComparator(flowB, flowA, new StaticDifferenceDescriptor(), decryptor, VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); final Set diffShallowChildPgRemoved = comparator.compare().getDifferences(); @@ -266,7 +267,7 @@ public void testDeepStrategyWithChildPGs() { .anyMatch(difference -> difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && difference.getComponentA().getComponentType() == ComponentType.PROCESS_GROUP)); - comparator = new StandardFlowComparator(flowB, flowA, Collections.emptySet(), + comparator = new StandardFlowComparator(flowB, flowA, new StaticDifferenceDescriptor(), decryptor, VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.DEEP); final Set diffDeepChildPgRemoved = comparator.compare().getDifferences(); assertEquals(4, diffDeepChildPgRemoved.size()); @@ -340,7 +341,6 @@ public void testScheduledStateChangeDetectedForProcessorInRegularNestedGroup() { final StandardFlowComparator testComparator = new StandardFlowComparator( registryFlow, localFlow, - Collections.emptySet(), new StaticDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, @@ -407,7 +407,6 @@ public void testNoChangesDetectedForSeparatelyVersionedNestedGroupWhenVersionsMa final StandardFlowComparator testComparator = new StandardFlowComparator( registryFlow, localFlow, - Collections.emptySet(), new StaticDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, @@ -478,7 +477,6 @@ public void testNestedVersionedPGWithDifferentIdentifiersButSameVersion() { final StandardFlowComparator shallowComparator = new StandardFlowComparator( registryFlow, localFlow, - Collections.emptySet(), new StaticDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, @@ -551,7 +549,6 @@ public void testAddProcessorToParentOnly() { final StandardFlowComparator comparator = new StandardFlowComparator( registryFlow, localFlow, - Collections.emptySet(), new StaticDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, @@ -622,7 +619,7 @@ public void testAddNonVersionedNestedPGWithProcessorShowsBothAdditions() { // DEEP strategy: both PG2 addition AND processor addition should be reported final StandardFlowComparator deepComparator = new StandardFlowComparator( - registryFlow, localFlow, Collections.emptySet(), new StaticDifferenceDescriptor(), + registryFlow, localFlow, new StaticDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.DEEP); final Set deepDifferences = deepComparator.compare().getDifferences(); @@ -638,7 +635,7 @@ public void testAddNonVersionedNestedPGWithProcessorShowsBothAdditions() { // SHALLOW strategy: only PG2 addition is reported (processor inside is not expanded) final StandardFlowComparator shallowComparator = new StandardFlowComparator( - registryFlow, localFlow, Collections.emptySet(), new StaticDifferenceDescriptor(), + registryFlow, localFlow, new StaticDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); final Set shallowDifferences = shallowComparator.compare().getDifferences(); @@ -688,7 +685,7 @@ public void testAddProcessorToExistingNonVersionedNestedPGShowsProcessorAddition final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localRoot); final StandardFlowComparator testComparator = new StandardFlowComparator( - registryFlow, localFlow, Collections.emptySet(), new StaticDifferenceDescriptor(), + registryFlow, localFlow, new StaticDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); final Set differences = testComparator.compare().getDifferences(); @@ -784,7 +781,7 @@ public void testNestedVersionedPGVersionChangeOnlyReportsVersionCoordinateDiffer final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localParent); final StandardFlowComparator testComparator = new StandardFlowComparator( - registryFlow, localFlow, Collections.emptySet(), new StaticDifferenceDescriptor(), + registryFlow, localFlow, new StaticDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); final Set differences = testComparator.compare().getDifferences(); @@ -805,6 +802,57 @@ public void testNestedVersionedPGVersionChangeOnlyReportsVersionCoordinateDiffer "Should only have 1 difference (child PG version coordinate change). Differences found: " + differences); } + /** + * Verifies that changing a processor's controller service property from one external service + * to another is detected as a PROPERTY_CHANGED difference. Previously, the comparator had + * suppression logic that could incorrectly hide such changes when the old service ID + * was no longer accessible. + */ + @Test + public void testExternalControllerServicePropertyChangeDetected() { + final String processorId = "processor1"; + final String servicePropertyKey = "my-service"; + final String oldServiceId = "service-X-id"; + final String newServiceId = "service-Y-id"; + + final VersionedPropertyDescriptor serviceDescriptor = new VersionedPropertyDescriptor(); + serviceDescriptor.setName(servicePropertyKey); + serviceDescriptor.setIdentifiesControllerService(true); + + final VersionedProcessor registryProcessor = new VersionedProcessor(); + registryProcessor.setIdentifier(processorId); + registryProcessor.setScheduledState(ScheduledState.ENABLED); + registryProcessor.setProperties(Map.of(servicePropertyKey, oldServiceId)); + registryProcessor.setPropertyDescriptors(Map.of(servicePropertyKey, serviceDescriptor)); + + final VersionedProcessor localProcessor = new VersionedProcessor(); + localProcessor.setIdentifier(processorId); + localProcessor.setScheduledState(ScheduledState.ENABLED); + localProcessor.setProperties(Map.of(servicePropertyKey, newServiceId)); + localProcessor.setPropertyDescriptors(Map.of(servicePropertyKey, serviceDescriptor)); + + final VersionedProcessGroup registryRoot = new VersionedProcessGroup(); + registryRoot.setIdentifier("rootPG"); + registryRoot.getProcessors().add(registryProcessor); + + final VersionedProcessGroup localRoot = new VersionedProcessGroup(); + localRoot.setIdentifier("rootPG"); + localRoot.getProcessors().add(localProcessor); + + final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryRoot); + final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localRoot); + + final StandardFlowComparator testComparator = new StandardFlowComparator( + registryFlow, localFlow, new StaticDifferenceDescriptor(), + Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); + + final Set differences = testComparator.compare().getDifferences(); + + assertEquals(1, differences.size()); + final FlowDifference diff = differences.iterator().next(); + assertEquals(DifferenceType.PROPERTY_CHANGED, diff.getDifferenceType()); + } + private VersionedFlowCoordinates createVersionedFlowCoordinates() { final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates(); coordinates.setRegistryId("registry"); diff --git a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java index 4726d8ec2285..1a0c2a22f2fa 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java +++ b/nifi-registry/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java @@ -925,7 +925,7 @@ public VersionedFlowDifference getFlowDiff(final String bucketIdentifier, final // Compare the two versions of the flow final FlowComparator flowComparator = new StandardFlowComparator(comparableFlowA, comparableFlowB, - null, new ConciseEvolvingDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.DEEP); + new ConciseEvolvingDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.DEEP); final FlowComparison flowComparison = flowComparator.compare(); final VersionedFlowDifference result = new VersionedFlowDifference(); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ExternalControllerServiceVersioningIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ExternalControllerServiceVersioningIT.java new file mode 100644 index 000000000000..d74000f79357 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/ExternalControllerServiceVersioningIT.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.registry; + +import org.apache.nifi.tests.system.NiFiClientUtil; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.DifferenceDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; +import org.apache.nifi.web.api.dto.flow.FlowDTO; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.apache.nifi.web.api.entity.FlowComparisonEntity; +import org.apache.nifi.web.api.entity.FlowRegistryClientEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * System tests verifying correct version-control behavior when a versioned Process Group + * references an external Controller Service (one defined in an ancestor group). + * + * Several tests simulate a cross-instance (dev to prod) scenario by committing a flow + * that references an external service, then deleting the original PG and service, creating + * a new service with the same name (but a different ID), and importing the flow from the + * registry. This reproduces the ID mismatch that occurs when a flow versioned on one NiFi + * instance is imported into another instance that has a same-named external service. + */ +public class ExternalControllerServiceVersioningIT extends NiFiSystemIT { + private static final String TEST_FLOWS_BUCKET = "test-flows"; + private static final String COUNT_SERVICE_TYPE = "StandardCountService"; + + /** + * Simulates a cross-instance import: a flow is committed referencing an external service, + * then the original PG and service are removed and a new service with the same name (but + * different ID) is created before re-importing from the registry. + * + * After import, the flow should be UP_TO_DATE because the external service was resolved + * by name during import and the cached snapshot should also be resolved. + * Both the state badge and the "Show Local Changes" dialog should agree. + */ + @Test + public void testCrossInstanceImportWithExternalServiceShowsUpToDate() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity registryClient = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ControllerServiceEntity devService = util.createControllerService(COUNT_SERVICE_TYPE, "root"); + util.enableControllerService(devService); + + final ProcessGroupEntity child = util.createProcessGroup("Child", "root"); + final ProcessorEntity counter = util.createProcessor("CountFlowFiles", child.getId()); + util.updateProcessorProperties(counter, Collections.singletonMap("Count Service", devService.getComponent().getId())); + final ProcessorEntity terminate = util.createProcessor("TerminateFlowFile", child.getId()); + util.createConnection(counter, terminate, "success"); + + final VersionControlInformationEntity vci = util.startVersionControl(child, registryClient, TEST_FLOWS_BUCKET, "cross-instance-flow"); + util.assertFlowUpToDate(child.getId()); + final VersionControlInformationDTO vciDto = vci.getVersionControlInformation(); + + getNifiClient().getVersionsClient().stopVersionControl( + getNifiClient().getProcessGroupClient().getProcessGroup(child.getId())); + deleteProcessGroupContents(child.getId()); + getNifiClient().getProcessGroupClient().deleteProcessGroup( + getNifiClient().getProcessGroupClient().getProcessGroup(child.getId())); + + deleteControllerService(devService); + + final ControllerServiceEntity prodService = util.createControllerService(COUNT_SERVICE_TYPE, "root"); + assertNotEquals(devService.getComponent().getId(), prodService.getComponent().getId(), + "Prod service should have a different ID than dev service"); + util.enableControllerService(prodService); + + final ProcessGroupEntity imported = util.importFlowFromRegistry("root", vciDto.getRegistryId(), + vciDto.getBucketId(), vciDto.getFlowId(), vciDto.getVersion()); + + waitForVersionedFlowState(imported.getId(), "root", "UP_TO_DATE"); + + final FlowComparisonEntity localMods = getNifiClient().getProcessGroupClient().getLocalModifications(imported.getId()); + assertTrue(localMods.getComponentDifferences().isEmpty(), + "After cross-instance import, Show Local Changes should report no differences"); + } + + /** + * Simulates a cross-instance upgrade where v1 and v2 both reference the same external + * service but differ in a non-service property (scheduling period). The original PG and + * service are deleted, a new service with the same name (different ID) is created, then + * the flow is re-imported at v1 and upgraded to v2. + * + * NiFi preserves existing external service references during upgrades, so the external + * service reference stays the same in both versions. After upgrade, the flow should be + * UP_TO_DATE because the only change (scheduling period) is applied by the synchronizer + * and the external service reference matches in both the local flow and the VCI snapshot. + */ + @Test + public void testCrossInstanceUpgradeWithExternalServiceShowsUpToDate() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity registryClient = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ControllerServiceEntity devService = util.createControllerService(COUNT_SERVICE_TYPE, "root"); + util.enableControllerService(devService); + + final ProcessGroupEntity child = util.createProcessGroup("Child", "root"); + ProcessorEntity counter = util.createProcessor("CountFlowFiles", child.getId()); + util.updateProcessorProperties(counter, Collections.singletonMap("Count Service", devService.getComponent().getId())); + final ProcessorEntity terminate = util.createProcessor("TerminateFlowFile", child.getId()); + util.createConnection(counter, terminate, "success"); + + final VersionControlInformationEntity vci = util.startVersionControl(child, registryClient, TEST_FLOWS_BUCKET, "cross-instance-upgrade"); + util.assertFlowUpToDate(child.getId()); + + counter = util.updateProcessorSchedulingPeriod(counter, "10 sec"); + util.saveFlowVersion(child, registryClient, vci); + util.assertFlowUpToDate(child.getId()); + final VersionControlInformationDTO vciDto = vci.getVersionControlInformation(); + + getNifiClient().getVersionsClient().stopVersionControl( + getNifiClient().getProcessGroupClient().getProcessGroup(child.getId())); + deleteProcessGroupContents(child.getId()); + getNifiClient().getProcessGroupClient().deleteProcessGroup( + getNifiClient().getProcessGroupClient().getProcessGroup(child.getId())); + + deleteControllerService(devService); + + final ControllerServiceEntity prodService = util.createControllerService(COUNT_SERVICE_TYPE, "root"); + assertNotEquals(devService.getComponent().getId(), prodService.getComponent().getId(), + "Prod service should have a different ID than dev service"); + util.enableControllerService(prodService); + + final ProcessGroupEntity imported = util.importFlowFromRegistry("root", vciDto.getRegistryId(), + vciDto.getBucketId(), vciDto.getFlowId(), "1"); + + waitForVersionedFlowState(imported.getId(), "root", "STALE"); + + util.changeFlowVersion(imported.getId(), "2"); + + waitForVersionedFlowState(imported.getId(), "root", "UP_TO_DATE"); + + final FlowComparisonEntity localMods = getNifiClient().getProcessGroupClient().getLocalModifications(imported.getId()); + assertTrue(localMods.getComponentDifferences().isEmpty(), + "After cross-instance upgrade, Show Local Changes should report no differences"); + } + + /** + * Reproduces the NIFI-15697 scenario on a single instance: + * + * 1. Create an external service "StandardCountService" and a child PG referencing it, commit. + * 2. Create a second external service "AlternateCountService", switch the processor to it. + * 3. Delete the original service. + * 4. Verify that both the state badge (LOCALLY_MODIFIED) and the dialog (shows the change) agree. + * + * The two services have different names so the name-based resolver cannot falsely reconcile them. + */ + @Test + public void testSwitchExternalServiceAndDeleteOriginalShowsLocalModification() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity registryClient = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ControllerServiceEntity serviceA = util.createControllerService(COUNT_SERVICE_TYPE, "root"); + util.enableControllerService(serviceA); + + final ProcessGroupEntity child = util.createProcessGroup("Child", "root"); + final ProcessorEntity counter = util.createProcessor("CountFlowFiles", child.getId()); + util.updateProcessorProperties(counter, Collections.singletonMap("Count Service", serviceA.getComponent().getId())); + final ProcessorEntity terminate = util.createProcessor("TerminateFlowFile", child.getId()); + util.createConnection(counter, terminate, "success"); + + util.startVersionControl(child, registryClient, TEST_FLOWS_BUCKET, "nifi-15697-flow"); + util.assertFlowUpToDate(child.getId()); + + ControllerServiceEntity serviceB = util.createControllerService(COUNT_SERVICE_TYPE, "root"); + serviceB = renameControllerService(serviceB, "AlternateCountService"); + util.enableControllerService(serviceB); + + util.updateProcessorProperties(counter, Collections.singletonMap("Count Service", serviceB.getComponent().getId())); + + String state = util.getVersionedFlowState(child.getId(), "root"); + assertEquals("LOCALLY_MODIFIED", state, "After switching external service reference, PG should be LOCALLY_MODIFIED"); + + deleteControllerService(serviceA); + + state = util.getVersionedFlowState(child.getId(), "root"); + assertEquals("LOCALLY_MODIFIED", state, "After deleting original service, PG should still be LOCALLY_MODIFIED"); + + final FlowComparisonEntity localMods = getNifiClient().getProcessGroupClient().getLocalModifications(child.getId()); + assertFalse(localMods.getComponentDifferences().isEmpty(), "Show Local Changes should report differences"); + + final boolean hasPropertyValueChange = localMods.getComponentDifferences().stream() + .flatMap(dto -> dto.getDifferences().stream()) + .map(DifferenceDTO::getDifferenceType) + .anyMatch(type -> type.contains("Property Value Changed")); + assertTrue(hasPropertyValueChange, "Differences should include a Property Value Changed difference"); + } + + /** + * Verifies that when a versioned PG references an external service and the user has NOT + * modified anything, both the state badge and the dialog report UP_TO_DATE / no changes. + */ + @Test + public void testExternalServiceReferenceWithoutModificationShowsUpToDate() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity registryClient = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ControllerServiceEntity service = util.createControllerService(COUNT_SERVICE_TYPE, "root"); + util.enableControllerService(service); + + final ProcessGroupEntity child = util.createProcessGroup("Child", "root"); + final ProcessorEntity counter = util.createProcessor("CountFlowFiles", child.getId()); + util.updateProcessorProperties(counter, Collections.singletonMap("Count Service", service.getComponent().getId())); + final ProcessorEntity terminate = util.createProcessor("TerminateFlowFile", child.getId()); + util.createConnection(counter, terminate, "success"); + + util.startVersionControl(child, registryClient, TEST_FLOWS_BUCKET, "ext-svc-unchanged"); + util.assertFlowUpToDate(child.getId()); + + final FlowComparisonEntity localMods = getNifiClient().getProcessGroupClient().getLocalModifications(child.getId()); + assertTrue(localMods.getComponentDifferences().isEmpty(), + "Show Local Changes should report no differences for an unmodified flow"); + } + + /** + * Deletes only the connections and processors within a Process Group, without touching + * Controller Services (which may be inherited from ancestor groups). + */ + private void deleteProcessGroupContents(final String groupId) throws NiFiClientException, IOException { + final ProcessGroupFlowEntity flowEntity = getNifiClient().getFlowClient().getProcessGroup(groupId); + final FlowDTO flowDto = flowEntity.getProcessGroupFlow().getFlow(); + + for (final ConnectionEntity connection : flowDto.getConnections()) { + connection.setDisconnectedNodeAcknowledged(true); + getNifiClient().getConnectionClient().deleteConnection(connection); + } + + for (final ProcessorEntity processor : flowDto.getProcessors()) { + processor.setDisconnectedNodeAcknowledged(true); + getNifiClient().getProcessorClient().deleteProcessor(processor); + } + } + + private ControllerServiceEntity renameControllerService(final ControllerServiceEntity service, final String newName) + throws NiFiClientException, IOException { + final ControllerServiceDTO dto = new ControllerServiceDTO(); + dto.setId(service.getId()); + dto.setName(newName); + + final ControllerServiceEntity entity = new ControllerServiceEntity(); + entity.setId(service.getId()); + entity.setComponent(dto); + entity.setRevision(service.getRevision()); + + return getNifiClient().getControllerServicesClient().updateControllerService(entity); + } + + private void deleteControllerService(final ControllerServiceEntity service) throws NiFiClientException, IOException, InterruptedException { + getClientUtil().disableControllerService(service); + waitForControllerServiceState(service.getId(), "DISABLED"); + final ControllerServiceEntity refreshed = getNifiClient().getControllerServicesClient().getControllerService(service.getId()); + getNifiClient().getControllerServicesClient().deleteControllerService(refreshed); + } + + private void waitForControllerServiceState(final String serviceId, final String expectedState) throws NiFiClientException, IOException, InterruptedException { + final long maxWait = System.currentTimeMillis() + 30_000; + while (System.currentTimeMillis() < maxWait) { + final ControllerServiceEntity entity = getNifiClient().getControllerServicesClient().getControllerService(serviceId); + if (expectedState.equalsIgnoreCase(entity.getComponent().getState())) { + return; + } + Thread.sleep(100L); + } + throw new AssertionError("Controller Service " + serviceId + " did not reach " + expectedState + " state within 30 seconds"); + } + + private void waitForVersionedFlowState(final String groupId, final String parentGroupId, final String expectedState) + throws NiFiClientException, IOException, InterruptedException { + + final long maxWait = System.currentTimeMillis() + 60_000; + while (System.currentTimeMillis() < maxWait) { + final String state = getClientUtil().getVersionedFlowState(groupId, parentGroupId); + if (expectedState.equalsIgnoreCase(state)) { + return; + } + Thread.sleep(500L); + } + + final String finalState = getClientUtil().getVersionedFlowState(groupId, parentGroupId); + if (expectedState.equalsIgnoreCase(finalState)) { + return; + } + + if ("LOCALLY_MODIFIED".equalsIgnoreCase(finalState) || "LOCALLY_MODIFIED_AND_STALE".equalsIgnoreCase(finalState)) { + final FlowComparisonEntity localMods = getNifiClient().getProcessGroupClient().getLocalModifications(groupId); + final StringBuilder sb = new StringBuilder(); + localMods.getComponentDifferences().stream() + .flatMap(dto -> dto.getDifferences().stream()) + .map(DifferenceDTO::getDifference) + .forEach(diff -> sb.append("\n - ").append(diff)); + throw new AssertionError("Expected versioned flow state " + expectedState + " but was " + finalState + + " with modifications:" + sb); + } + + throw new AssertionError("Expected versioned flow state " + expectedState + " but was " + finalState); + } +}