Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public void synchronize(final ProcessGroup group, final VersionedExternalFlow ve
final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("Proposed Flow", versionedExternalFlow.getFlowContents());

final PropertyDecryptor decryptor = options.getPropertyDecryptor();
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, group.getAncestorServiceIds(),
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow,
new StaticDifferenceDescriptor(), decryptor::decrypt, options.getComponentComparisonIdLookup(), FlowComparatorVersionedStrategy.DEEP);
final FlowComparison flowComparison = flowComparator.compare();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flow.ExecutionEngine;
import org.apache.nifi.flow.ExternalControllerServiceReference;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
import org.apache.nifi.flow.synchronization.StandardVersionedComponentSynchronizer;
import org.apache.nifi.flow.synchronization.VersionedFlowSynchronizationContext;
import org.apache.nifi.lifecycle.ProcessorStopLifecycleMethods;
Expand Down Expand Up @@ -2633,6 +2637,13 @@ public void removeControllerService(final ControllerServiceNode service) {
}
});

// When an ancestor controller service is removed, any descendant versioned PG whose
// committed snapshot referenced that service needs its cached differences invalidated,
// even if no component currently references the deleted service (e.g., the processor
// was already switched to a different service before the old one was deleted).
findAllProcessGroups(pg -> pg.getVersionControlInformation() != null)
.forEach(ProcessGroup::onComponentModified);

scheduler.submitFrameworkTask(() -> stateManagerProvider.onComponentRemoved(service.getIdentifier()));

removed = true;
Expand Down Expand Up @@ -3758,6 +3769,7 @@ public void synchronizeWithFlowRegistry(final FlowManager flowManager) {
final FlowSnapshotContainer registrySnapshotContainer = flowRegistry.getFlowContents(
FlowRegistryClientContextFactory.getAnonymousContext(), flowVersionLocation, false);
final RegisteredFlowSnapshot registrySnapshot = registrySnapshotContainer.getFlowSnapshot();
resolveExternalServiceReferences(registrySnapshot);
final VersionedProcessGroup registryFlow = registrySnapshot.getFlowContents();
vci.setFlowSnapshot(registryFlow);
} catch (final IOException | FlowRegistryException e) {
Expand Down Expand Up @@ -3911,26 +3923,6 @@ public void synchronizeFlow(final VersionedExternalFlow proposedSnapshot, final
}
}

@Override
public Set<String> getAncestorServiceIds() {
final Set<String> ancestorServiceIds;
ProcessGroup parentGroup = getParent();

if (parentGroup == null) {
ancestorServiceIds = Collections.emptySet();
} else {
// We want to map the Controller Service to its Versioned Component ID, if it has one.
// If it does not have one, we want to generate it in the same way that our Flow Mapper does
// because this allows us to find the Controller Service when doing a Flow Diff.
ancestorServiceIds = parentGroup.getControllerServices(true).stream()
.map(cs -> cs.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(cs.getIdentifier())))
.collect(Collectors.toSet());
}

return ancestorServiceIds;
}

private String generateUuid(final String propposedId, final String destinationGroupId, final String seed) {
long msb = UUID.nameUUIDFromBytes((propposedId + destinationGroupId).getBytes(StandardCharsets.UTF_8)).getMostSignificantBits();

Expand All @@ -3947,6 +3939,81 @@ private String generateUuid(final String propposedId, final String destinationGr
return uuid.toString();
}

private void resolveExternalServiceReferences(final RegisteredFlowSnapshot snapshot) {
final Map<String, ExternalControllerServiceReference> externalRefs = snapshot.getExternalControllerServices();
if (externalRefs == null || externalRefs.isEmpty()) {
return;
}

final ProcessGroup parentGroup = getParent();
if (parentGroup == null) {
return;
}

final Map<String, String> serviceNameToVersionedId = new HashMap<>();
for (final ControllerServiceNode serviceNode : parentGroup.getControllerServices(true)) {
final String versionedId = serviceNode.getVersionedComponentId().orElse(
NiFiRegistryFlowMapper.generateVersionedComponentId(serviceNode.getIdentifier()));
serviceNameToVersionedId.put(serviceNode.getName(), versionedId);
}

final Map<String, String> foreignToLocalId = new HashMap<>();
for (final Map.Entry<String, ExternalControllerServiceReference> entry : externalRefs.entrySet()) {
final String foreignId = entry.getKey();
final String serviceName = entry.getValue().getName();
final String localId = serviceNameToVersionedId.get(serviceName);
if (localId != null && !localId.equals(foreignId)) {
foreignToLocalId.put(foreignId, localId);
}
}

if (!foreignToLocalId.isEmpty()) {
replaceExternalServiceIds(snapshot.getFlowContents(), foreignToLocalId);
}
}

private void replaceExternalServiceIds(final VersionedProcessGroup group, final Map<String, String> foreignToLocalId) {
if (group.getProcessors() != null) {
for (final VersionedProcessor processor : group.getProcessors()) {
replaceServicePropertyIds(processor.getProperties(), processor.getPropertyDescriptors(), foreignToLocalId);
}
}

if (group.getControllerServices() != null) {
for (final VersionedControllerService service : group.getControllerServices()) {
replaceServicePropertyIds(service.getProperties(), service.getPropertyDescriptors(), foreignToLocalId);
}
}

if (group.getProcessGroups() != null) {
for (final VersionedProcessGroup child : group.getProcessGroups()) {
replaceExternalServiceIds(child, foreignToLocalId);
}
}
}

private void replaceServicePropertyIds(final Map<String, String> properties, final Map<String, VersionedPropertyDescriptor> descriptors,
final Map<String, String> foreignToLocalId) {
if (properties == null || descriptors == null) {
return;
}

for (final Map.Entry<String, String> entry : properties.entrySet()) {
final String propertyValue = entry.getValue();
if (propertyValue == null) {
continue;
}

final VersionedPropertyDescriptor descriptor = descriptors.get(entry.getKey());
if (descriptor != null && descriptor.getIdentifiesControllerService()) {
final String localId = foreignToLocalId.get(propertyValue);
if (localId != null) {
entry.setValue(localId);
}
}
}
}

private Set<FlowDifference> getModifications() {
final StandardVersionControlInformation vci = versionControlInfo.get();

Expand Down Expand Up @@ -3975,7 +4042,7 @@ private Set<FlowDifference> getModifications() {
final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Local Flow", versionedGroup);
final ComparableDataFlow snapshotFlow = new StandardComparableDataFlow("Versioned Flow", vci.getFlowSnapshot());

final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow, getAncestorServiceIds(),
final FlowComparator flowComparator = new StandardFlowComparator(snapshotFlow, currentFlow,
new EvolvingDifferenceDescriptor(), encryptor::decrypt, VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW);
final FlowComparison comparison = flowComparator.compare();
final Collection<FlowDifference> comparisonDifferences = comparison.getDifferences();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,13 +596,6 @@ default CompletableFuture<Void> stopComponents() {
*/
Funnel findFunnel(String id);

/**
* Gets a collection of identifiers representing all ancestor controller services
*
* @return collection of ancestor controller service identifiers
*/
Set<String> getAncestorServiceIds();

/**
* @param id of the Controller Service
* @param includeDescendantGroups whether or not to include descendant process groups
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ private FlowComparison compareFlows(final DataFlow existingFlow, final DataFlow
toSet(existingVersionedFlow.getRegistries())
);

final FlowComparator flowComparator = new StandardFlowComparator(localDataFlow, clusterDataFlow, Collections.emptySet(),
final FlowComparator flowComparator = new StandardFlowComparator(localDataFlow, clusterDataFlow,
differenceDescriptor, encryptor::decrypt, VersionedComponent::getInstanceIdentifier, FlowComparatorVersionedStrategy.DEEP);
return flowComparator.compare();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,6 @@ public Funnel findFunnel(final String id) {
return null;
}

@Override
public Set<String> getAncestorServiceIds() {
return null;
}

@Override
public ControllerServiceNode findControllerService(final String id, final boolean includeDescendants, final boolean includeAncestors) {
return serviceMap.get(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5546,7 +5546,6 @@ public FlowComparisonEntity getVersionDifference(final String registryId, FlowVe
final FlowComparator flowComparator = new StandardFlowComparator(
new StandardComparableDataFlow("Flow A", flowContentsA),
new StandardComparableDataFlow("Flow B", flowContentsB),
Collections.emptySet(), // Replacement of an external ControllerService is recognized as property change
new ConciseEvolvingDifferenceDescriptor(),
Function.identity(),
VersionedComponent::getIdentifier,
Expand Down Expand Up @@ -5626,6 +5625,12 @@ public FlowComparisonEntity getLocalModifications(final String processGroupId) {
versionControlInfo.getFlowIdentifier(), versionControlInfo.getVersion());
final FlowSnapshotContainer flowSnapshotContainer = flowRegistry.getFlowContents(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()),
flowVersionLocation, true);

// Resolve external controller service references by name so that cross-instance
// ID differences do not appear as phantom local modifications.
final String parentGroupId = processGroup.getParent() == null ? processGroup.getIdentifier() : processGroup.getParent().getIdentifier();
controllerFacade.getControllerServiceResolver().resolveInheritedControllerServices(flowSnapshotContainer, parentGroupId, NiFiUserUtils.getNiFiUser());

final RegisteredFlowSnapshot versionedFlowSnapshot = flowSnapshotContainer.getFlowSnapshot();
registryGroup = versionedFlowSnapshot.getFlowContents();
} catch (final IOException | FlowRegistryException e) {
Expand All @@ -5638,8 +5643,7 @@ public FlowComparisonEntity getLocalModifications(final String processGroupId) {
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup);
final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", registryGroup);

final Set<String> ancestorServiceIds = processGroup.getAncestorServiceIds();
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, new ConciseEvolvingDifferenceDescriptor(),
final FlowComparator flowComparator = new StandardFlowComparator(registryFlow, localFlow, new ConciseEvolvingDifferenceDescriptor(),
Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW);
final FlowComparison flowComparison = flowComparator.compare();

Expand Down Expand Up @@ -5802,8 +5806,7 @@ public Set<AffectedComponentEntity> getComponentsAffectedByFlowUpdate(final Stri
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Current Flow", localContents);
final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("New Flow", updatedSnapshot.getFlowContents());

final Set<String> ancestorServiceIds = group.getAncestorServiceIds();
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorServiceIds, new StaticDifferenceDescriptor(),
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, new StaticDifferenceDescriptor(),
Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.DEEP);
final FlowComparison comparison = flowComparator.compare();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,6 @@ public void testGetComponentsAffectedByFlowUpdate_WithNewStatelessProcessGroup_R
final String groupId = UUID.randomUUID().toString();
final ProcessGroup processGroup = mock(ProcessGroup.class);
when(processGroupDAO.getProcessGroup(groupId)).thenReturn(processGroup);
when(processGroup.getAncestorServiceIds()).thenReturn(Collections.emptySet());

final FlowManager flowManager = mock(FlowManager.class);
final ExtensionManager extensionManager = mock(ExtensionManager.class);
when(flowController.getFlowManager()).thenReturn(flowManager);
Expand Down Expand Up @@ -352,7 +350,6 @@ public void testGetComponentsAffectedByFlowUpdate_WithNewStatelessProcessGroup_R
final FlowComparator flowComparator = new StandardFlowComparator(
localFlow,
proposedFlow,
Collections.emptySet(),
new StaticDifferenceDescriptor(),
Function.identity(),
VersionedComponent::getIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,16 @@ public class StandardFlowComparator implements FlowComparator {

private final ComparableDataFlow flowA;
private final ComparableDataFlow flowB;
private final Set<String> externallyAccessibleServiceIds;
private final DifferenceDescriptor differenceDescriptor;
private final Function<String, String> propertyDecryptor;
private final Function<VersionedComponent, String> idLookup;
private final FlowComparatorVersionedStrategy flowComparatorVersionedStrategy;

public StandardFlowComparator(final ComparableDataFlow flowA, final ComparableDataFlow flowB, final Set<String> externallyAccessibleServiceIds,
public StandardFlowComparator(final ComparableDataFlow flowA, final ComparableDataFlow flowB,
final DifferenceDescriptor differenceDescriptor, final Function<String, String> propertyDecryptor,
final Function<VersionedComponent, String> idLookup, final FlowComparatorVersionedStrategy flowComparatorVersionedStrategy) {
this.flowA = flowA;
this.flowB = flowB;
this.externallyAccessibleServiceIds = externallyAccessibleServiceIds;
this.differenceDescriptor = differenceDescriptor;
this.propertyDecryptor = propertyDecryptor;
this.idLookup = idLookup;
Expand Down Expand Up @@ -427,21 +425,6 @@ private void compareProperties(final VersionedComponent componentA, final Versio
differences.add(difference(DifferenceType.PROPERTY_REMOVED, componentA, componentB, key, displayName, valueA, valueB));
}
} else if (valueA != null && !valueA.equals(valueB)) {
// If the property in Flow A references a Controller Service that is not available in the flow
// and the property in Flow B references a Controller Service that is available in its environment
// but not part of the Versioned Flow, then we do not want to consider this to be a Flow Difference.
// This is typically the case when a flow is versioned in one instance, referencing an external Controller Service,
// and then imported into another NiFi instance. When imported, the property does not point to any existing Controller
// Service, and the user must then point the property an existing Controller Service. We don't want to consider the
// flow as having changed, since it is an environment-specific change (similar to how we handle variables).
if (descriptor != null && descriptor.getIdentifiesControllerService()) {
final boolean accessibleA = externallyAccessibleServiceIds.contains(valueA);
final boolean accessibleB = externallyAccessibleServiceIds.contains(valueB);
if (!accessibleA && accessibleB) {
return;
}
}

final boolean aParameterized = isParameterReference(valueA);
final boolean bParameterized = isParameterReference(valueB);
if (aParameterized && !bParameterized) {
Expand Down
Loading
Loading