From e5341cbf496e69e48154b6701adf8b0e061a84f7 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Wed, 11 Mar 2026 13:23:02 +0100 Subject: [PATCH] NIFI-15696 - Add Rebase operation for versioned process groups Signed-off-by: Pierre Villard --- .../nifi/web/api/dto/RebaseChangeDTO.java | 125 ++++ .../web/api/entity/RebaseAnalysisEntity.java | 110 ++++ .../web/api/entity/RebaseRequestEntity.java | 45 ++ .../apache/nifi/web/NiFiServiceFacade.java | 42 ++ .../nifi/web/StandardNiFiServiceFacade.java | 227 +++++++ .../apache/nifi/web/api/VersionsResource.java | 188 +++++- .../apache/nifi/web/api/dto/DtoFactory.java | 74 +++ .../nifi-registry-flow-diff/pom.xml | 4 + .../diff/BendpointsChangedRebaseHandler.java | 53 ++ .../diff/CommentsChangedRebaseHandler.java | 60 ++ .../diff/PositionChangedRebaseHandler.java | 53 ++ .../flow/diff/PropertyAddedRebaseHandler.java | 99 +++ .../diff/PropertyChangedRebaseHandler.java | 104 +++ .../registry/flow/diff/RebaseAnalysis.java | 105 +++ .../flow/diff/RebaseClassification.java | 37 ++ .../nifi/registry/flow/diff/RebaseEngine.java | 159 +++++ .../registry/flow/diff/RebaseHandler.java | 31 + .../flow/diff/RebaseHandlerUtils.java | 131 ++++ .../flow/diff/SizeChangedRebaseHandler.java | 61 ++ .../TestPropertyChangedRebaseHandler.java | 250 +++++++ .../registry/flow/diff/TestRebaseEngine.java | 609 ++++++++++++++++++ .../nifi/tests/system/NiFiClientUtil.java | 46 ++ .../system/registry/RebaseVersionIT.java | 405 ++++++++++++ .../nifi/toolkit/client/VersionsClient.java | 10 + .../client/impl/JerseyVersionsClient.java | 81 +++ 25 files changed, 3108 insertions(+), 1 deletion(-) create mode 100644 nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RebaseChangeDTO.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RebaseAnalysisEntity.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RebaseRequestEntity.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/BendpointsChangedRebaseHandler.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/CommentsChangedRebaseHandler.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PositionChangedRebaseHandler.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PropertyAddedRebaseHandler.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PropertyChangedRebaseHandler.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseAnalysis.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseClassification.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseEngine.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseHandler.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseHandlerUtils.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/SizeChangedRebaseHandler.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestPropertyChangedRebaseHandler.java create mode 100644 nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestRebaseEngine.java create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RebaseVersionIT.java diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RebaseChangeDTO.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RebaseChangeDTO.java new file mode 100644 index 000000000000..b60db380647d --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/RebaseChangeDTO.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.xml.bind.annotation.XmlType; + +@XmlType(name = "rebaseChange") +public class RebaseChangeDTO { + private String componentId; + private String componentName; + private String componentType; + private String differenceType; + private String fieldName; + private String localValue; + private String registryValue; + private String classification; + private String conflictCode; + private String conflictDetail; + + @Schema(description = "The ID of the component that was changed") + public String getComponentId() { + return componentId; + } + + public void setComponentId(final String componentId) { + this.componentId = componentId; + } + + @Schema(description = "The name of the component that was changed") + public String getComponentName() { + return componentName; + } + + public void setComponentName(final String componentName) { + this.componentName = componentName; + } + + @Schema(description = "The type of the component that was changed") + public String getComponentType() { + return componentType; + } + + public void setComponentType(final String componentType) { + this.componentType = componentType; + } + + @Schema(description = "The type of difference detected for this change") + public String getDifferenceType() { + return differenceType; + } + + public void setDifferenceType(final String differenceType) { + this.differenceType = differenceType; + } + + @Schema(description = "The name of the field that was changed, or null if not applicable") + public String getFieldName() { + return fieldName; + } + + public void setFieldName(final String fieldName) { + this.fieldName = fieldName; + } + + @Schema(description = "The local value of the field, or null if not applicable") + public String getLocalValue() { + return localValue; + } + + public void setLocalValue(final String localValue) { + this.localValue = localValue; + } + + @Schema(description = "The registry value of the field, or null if not applicable") + public String getRegistryValue() { + return registryValue; + } + + public void setRegistryValue(final String registryValue) { + this.registryValue = registryValue; + } + + @Schema(description = "The classification of this change: COMPATIBLE, CONFLICTING, or UNSUPPORTED") + public String getClassification() { + return classification; + } + + public void setClassification(final String classification) { + this.classification = classification; + } + + @Schema(description = "A code identifying the type of conflict, or null if the change is not conflicting") + public String getConflictCode() { + return conflictCode; + } + + public void setConflictCode(final String conflictCode) { + this.conflictCode = conflictCode; + } + + @Schema(description = "A detailed description of the conflict, or null if the change is not conflicting") + public String getConflictDetail() { + return conflictDetail; + } + + public void setConflictDetail(final String conflictDetail) { + this.conflictDetail = conflictDetail; + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RebaseAnalysisEntity.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RebaseAnalysisEntity.java new file mode 100644 index 000000000000..55078fdac341 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RebaseAnalysisEntity.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.xml.bind.annotation.XmlRootElement; +import org.apache.nifi.web.api.dto.ComponentDifferenceDTO; +import org.apache.nifi.web.api.dto.RebaseChangeDTO; + +import java.util.List; +import java.util.Set; + +@XmlRootElement(name = "rebaseAnalysisEntity") +public class RebaseAnalysisEntity extends Entity { + private String processGroupId; + private String currentVersion; + private String targetVersion; + private String analysisFingerprint; + private Boolean rebaseAllowed; + private List localChanges; + private Set upstreamChanges; + private String failureReason; + + @Schema(description = "The ID of the Process Group being rebased") + public String getProcessGroupId() { + return processGroupId; + } + + public void setProcessGroupId(final String processGroupId) { + this.processGroupId = processGroupId; + } + + @Schema(description = "The current version of the flow in the Process Group") + public String getCurrentVersion() { + return currentVersion; + } + + public void setCurrentVersion(final String currentVersion) { + this.currentVersion = currentVersion; + } + + @Schema(description = "The target version to rebase to") + public String getTargetVersion() { + return targetVersion; + } + + public void setTargetVersion(final String targetVersion) { + this.targetVersion = targetVersion; + } + + @Schema(description = "A fingerprint representing the state of this analysis, used to verify the analysis is still valid when executing the rebase") + public String getAnalysisFingerprint() { + return analysisFingerprint; + } + + public void setAnalysisFingerprint(final String analysisFingerprint) { + this.analysisFingerprint = analysisFingerprint; + } + + @Schema(description = "Whether the rebase is allowed based on the analysis of local and upstream changes") + public Boolean getRebaseAllowed() { + return rebaseAllowed; + } + + public void setRebaseAllowed(final Boolean rebaseAllowed) { + this.rebaseAllowed = rebaseAllowed; + } + + @Schema(description = "The list of local changes that were made to the flow since the last version control operation") + public List getLocalChanges() { + return localChanges; + } + + public void setLocalChanges(final List localChanges) { + this.localChanges = localChanges; + } + + @Schema(description = "The set of upstream changes between the current version and the target version in the flow registry") + public Set getUpstreamChanges() { + return upstreamChanges; + } + + public void setUpstreamChanges(final Set upstreamChanges) { + this.upstreamChanges = upstreamChanges; + } + + @Schema(description = "The reason the rebase is not allowed, or null if the rebase is allowed") + public String getFailureReason() { + return failureReason; + } + + public void setFailureReason(final String failureReason) { + this.failureReason = failureReason; + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RebaseRequestEntity.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RebaseRequestEntity.java new file mode 100644 index 000000000000..3b38307bf489 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/RebaseRequestEntity.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "rebaseRequestEntity") +public class RebaseRequestEntity extends Entity { + private VersionControlInformationEntity versionControlInformationEntity; + private String analysisFingerprint; + + @Schema(description = "The Version Control information for the Process Group being rebased") + public VersionControlInformationEntity getVersionControlInformationEntity() { + return versionControlInformationEntity; + } + + public void setVersionControlInformationEntity(final VersionControlInformationEntity versionControlInformationEntity) { + this.versionControlInformationEntity = versionControlInformationEntity; + } + + @Schema(description = "The fingerprint of the rebase analysis, used to verify the analysis is still valid when executing the rebase") + public String getAnalysisFingerprint() { + return analysisFingerprint; + } + + public void setAnalysisFingerprint(final String analysisFingerprint) { + this.analysisFingerprint = analysisFingerprint; + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 3b5babdc0692..4ee9409346c2 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -144,6 +144,7 @@ import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorStatusEntity; import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity; +import org.apache.nifi.web.api.entity.RebaseAnalysisEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; @@ -1569,6 +1570,47 @@ Set getControllerServiceTypes(final String serviceType, final */ FlowComparisonEntity getLocalModifications(String processGroupId); + /** + * Performs a rebase analysis for the given Process Group, comparing local modifications against + * upstream changes between the current version and the specified target version. + * + * @param processGroupId the ID of the Process Group + * @param targetVersion the target version to rebase to + * @return a RebaseAnalysisEntity that contains the analysis of local and upstream changes + * @throws IllegalStateException if the Process Group with the given ID is not under version control + */ + RebaseAnalysisEntity getRebaseAnalysis(String processGroupId, String targetVersion); + + /** + * Verifies that the Process Group with the given identifier can be rebased to a new version. + * + * @param processGroupId the ID of the Process Group + * @param targetVersion the target version to rebase to + * @throws IllegalStateException if the Process Group cannot be rebased + */ + void verifyCanRebase(String processGroupId, String targetVersion); + + /** + * Returns a FlowSnapshotContainer for the target version of the flow with the merged rebase contents applied. + * This re-runs the rebase analysis and verifies the fingerprint before producing the merged snapshot. + * + * @param processGroupId the ID of the Process Group + * @param targetVersion the target version to rebase to + * @param expectedAnalysisFingerprint the expected analysis fingerprint to validate that the analysis has not changed + * @return a FlowSnapshotContainer containing the target version snapshot with merged local changes applied + * @throws IllegalStateException if the rebase is not allowed or the fingerprint does not match + */ + FlowSnapshotContainer getRebasedFlowSnapshot(String processGroupId, String targetVersion, String expectedAnalysisFingerprint); + + /** + * Resets the Version Control Information snapshot for a process group after a rebase operation. + * After rebase, the VCI snapshot must reference the clean target version (not the merged snapshot) + * so that subsequent local modification checks correctly detect the preserved local changes. + * + * @param processGroupId the process group ID + */ + void resetVersionControlSnapshotAfterRebase(String processGroupId); + /** * Determines whether the process group with the given id or any of its descendants are under version control. * diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 940310d3e8c0..c719cea4c767 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -189,6 +189,7 @@ import org.apache.nifi.registry.flow.RegisteredFlow; import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.StandardVersionControlInformation; import org.apache.nifi.registry.flow.VerifiableFlowRegistryClient; import org.apache.nifi.registry.flow.VersionControlInformation; import org.apache.nifi.registry.flow.VersionedFlowState; @@ -199,6 +200,8 @@ import org.apache.nifi.registry.flow.diff.FlowComparatorVersionedStrategy; import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; +import org.apache.nifi.registry.flow.diff.RebaseAnalysis; +import org.apache.nifi.registry.flow.diff.RebaseEngine; import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; import org.apache.nifi.registry.flow.diff.StandardFlowComparator; import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor; @@ -376,6 +379,7 @@ import org.apache.nifi.web.api.entity.ProcessorRunStatusDetailsEntity; import org.apache.nifi.web.api.entity.ProcessorStatusEntity; import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity; +import org.apache.nifi.web.api.entity.RebaseAnalysisEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupPortEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusEntity; @@ -452,6 +456,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; @@ -468,6 +473,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { private static final int VALIDATION_WAIT_MILLIS = 50; private static final String ROOT_PROCESS_GROUP = "RootProcessGroup"; + private final Map rebaseCleanSnapshots = new ConcurrentHashMap<>(); + // nifi core components private ControllerFacade controllerFacade; private SnippetUtils snippetUtils; @@ -5650,6 +5657,226 @@ public FlowComparisonEntity getLocalModifications(final String processGroupId) { return entity; } + @Override + public RebaseAnalysisEntity getRebaseAnalysis(final String processGroupId, final String targetVersion) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); + final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation(); + if (versionControlInfo == null) { + throw new IllegalStateException("Process Group with ID " + processGroupId + " is not under Version Control"); + } + + final String currentVersion = versionControlInfo.getVersion(); + if (currentVersion.equals(targetVersion)) { + throw new IllegalArgumentException("Target version %s is the same as the current version".formatted(targetVersion)); + } + + final FlowRegistryClientNode flowRegistry = flowRegistryDAO.getFlowRegistryClient(versionControlInfo.getRegistryIdentifier()); + if (flowRegistry == null) { + throw new IllegalStateException("Process Group with ID %s is tracking to a flow in Flow Registry with ID %s but cannot find a Flow Registry with that identifier" + .formatted(processGroupId, versionControlInfo.getRegistryIdentifier())); + } + + final FlowVersionLocation currentVersionLocation = new FlowVersionLocation(versionControlInfo.getBranch(), versionControlInfo.getBucketIdentifier(), + versionControlInfo.getFlowIdentifier(), currentVersion); + final FlowVersionLocation targetVersionLocation = new FlowVersionLocation(versionControlInfo.getBranch(), versionControlInfo.getBucketIdentifier(), + versionControlInfo.getFlowIdentifier(), targetVersion); + + final VersionedProcessGroup currentRegistryGroup; + final VersionedProcessGroup targetRegistryGroup; + try { + final FlowSnapshotContainer currentSnapshotContainer = flowRegistry.getFlowContents( + FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), currentVersionLocation, true); + currentRegistryGroup = currentSnapshotContainer.getFlowSnapshot().getFlowContents(); + + final FlowSnapshotContainer targetSnapshotContainer = flowRegistry.getFlowContents( + FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), targetVersionLocation, true); + targetRegistryGroup = targetSnapshotContainer.getFlowSnapshot().getFlowContents(); + } catch (final IOException | FlowRegistryException e) { + throw new NiFiCoreException("Failed to retrieve flow from Flow Registry in order to perform rebase analysis due to " + e.getMessage(), e); + } + + final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager()); + final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), controllerFacade.getFlowManager(), true); + + final Set ancestorServiceIds = processGroup.getAncestorServiceIds(); + + final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", currentRegistryGroup); + final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup); + final FlowComparator localComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, + new ConciseEvolvingDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); + final FlowComparison localComparison = localComparator.compare(); + + final FlowDifferenceFilters.EnvironmentalChangeContext localEnvironmentalContext = + FlowDifferenceFilters.buildEnvironmentalChangeContext(localComparison.getDifferences(), controllerFacade.getFlowManager()); + final Set localDifferences = new HashSet<>(); + for (final FlowDifference difference : localComparison.getDifferences()) { + if (!FlowDifferenceFilters.isEnvironmentalChange(difference, localGroup, controllerFacade.getFlowManager(), localEnvironmentalContext)) { + localDifferences.add(difference); + } + } + + final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Current Version", currentRegistryGroup); + final ComparableDataFlow targetFlow = new StandardComparableDataFlow("Target Version", targetRegistryGroup); + final FlowComparator upstreamComparator = new StandardFlowComparator(currentFlow, targetFlow, ancestorServiceIds, + new ConciseEvolvingDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); + final FlowComparison upstreamComparison = upstreamComparator.compare(); + + final Set upstreamDifferences = new HashSet<>(); + for (final FlowDifference difference : upstreamComparison.getDifferences()) { + if (!FlowDifferenceFilters.isEnvironmentalChange(difference, currentRegistryGroup, controllerFacade.getFlowManager(), + FlowDifferenceFilters.buildEnvironmentalChangeContext(upstreamComparison.getDifferences(), controllerFacade.getFlowManager()))) { + upstreamDifferences.add(difference); + } + } + + boolean descendantHasLocalModifications = false; + String descendantFailureReason = null; + try { + processGroup.verifyCanRevertLocalModifications(); + } catch (final Exception e) { + descendantHasLocalModifications = true; + descendantFailureReason = e.getMessage(); + } + + final RebaseEngine rebaseEngine = new RebaseEngine(); + final RebaseAnalysis analysis = rebaseEngine.analyze(localDifferences, upstreamDifferences, targetRegistryGroup); + + if (descendantHasLocalModifications) { + final RebaseAnalysis blockedAnalysis = new RebaseAnalysis(analysis.getClassifiedLocalChanges(), upstreamDifferences, + false, analysis.getAnalysisFingerprint(), null); + return dtoFactory.createRebaseAnalysisEntity(blockedAnalysis, processGroupId, currentVersion, targetVersion, upstreamDifferences, + descendantFailureReason); + } + + return dtoFactory.createRebaseAnalysisEntity(analysis, processGroupId, currentVersion, targetVersion, upstreamDifferences, null); + } + + @Override + public void verifyCanRebase(final String processGroupId, final String targetVersion) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); + final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation(); + if (versionControlInfo == null) { + throw new IllegalStateException("Process Group with ID " + processGroupId + " is not under Version Control"); + } + + if (versionControlInfo.getVersion().equals(targetVersion)) { + throw new IllegalArgumentException("Target version %s is the same as the current version".formatted(targetVersion)); + } + + processGroup.verifyCanRevertLocalModifications(); + } + + @Override + public FlowSnapshotContainer getRebasedFlowSnapshot(final String processGroupId, final String targetVersion, final String expectedAnalysisFingerprint) { + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); + final VersionControlInformation versionControlInfo = processGroup.getVersionControlInformation(); + if (versionControlInfo == null) { + throw new IllegalStateException("Process Group with ID " + processGroupId + " is not under Version Control"); + } + + final String currentVersion = versionControlInfo.getVersion(); + final FlowRegistryClientNode flowRegistry = flowRegistryDAO.getFlowRegistryClient(versionControlInfo.getRegistryIdentifier()); + if (flowRegistry == null) { + throw new IllegalStateException("Process Group with ID %s is tracking to a flow in Flow Registry with ID %s but cannot find a Flow Registry with that identifier" + .formatted(processGroupId, versionControlInfo.getRegistryIdentifier())); + } + + final FlowVersionLocation currentVersionLocation = new FlowVersionLocation(versionControlInfo.getBranch(), versionControlInfo.getBucketIdentifier(), + versionControlInfo.getFlowIdentifier(), currentVersion); + final FlowVersionLocation targetVersionLocation = new FlowVersionLocation(versionControlInfo.getBranch(), versionControlInfo.getBucketIdentifier(), + versionControlInfo.getFlowIdentifier(), targetVersion); + + final VersionedProcessGroup currentRegistryGroup; + final FlowSnapshotContainer targetSnapshotContainer; + try { + final FlowSnapshotContainer currentSnapshotContainer = flowRegistry.getFlowContents( + FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), currentVersionLocation, true); + currentRegistryGroup = currentSnapshotContainer.getFlowSnapshot().getFlowContents(); + + targetSnapshotContainer = flowRegistry.getFlowContents( + FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), targetVersionLocation, true); + } catch (final IOException | FlowRegistryException e) { + throw new NiFiCoreException("Failed to retrieve flow from Flow Registry for rebase due to " + e.getMessage(), e); + } + + final VersionedProcessGroup targetRegistryGroup = targetSnapshotContainer.getFlowSnapshot().getFlowContents(); + final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager()); + final VersionedProcessGroup localGroup = mapper.mapProcessGroup(processGroup, controllerFacade.getControllerServiceProvider(), controllerFacade.getFlowManager(), true); + final Set ancestorServiceIds = processGroup.getAncestorServiceIds(); + + final ComparableDataFlow registryFlow = new StandardComparableDataFlow("Versioned Flow", currentRegistryGroup); + final ComparableDataFlow localFlow = new StandardComparableDataFlow("Local Flow", localGroup); + final FlowComparator localComparator = new StandardFlowComparator(registryFlow, localFlow, ancestorServiceIds, + new ConciseEvolvingDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); + final FlowComparison localComparison = localComparator.compare(); + + final FlowDifferenceFilters.EnvironmentalChangeContext localEnvironmentalContext = + FlowDifferenceFilters.buildEnvironmentalChangeContext(localComparison.getDifferences(), controllerFacade.getFlowManager()); + final Set localDifferences = new HashSet<>(); + for (final FlowDifference difference : localComparison.getDifferences()) { + if (!FlowDifferenceFilters.isEnvironmentalChange(difference, localGroup, controllerFacade.getFlowManager(), localEnvironmentalContext)) { + localDifferences.add(difference); + } + } + + final ComparableDataFlow currentFlow = new StandardComparableDataFlow("Current Version", currentRegistryGroup); + final ComparableDataFlow targetFlow = new StandardComparableDataFlow("Target Version", targetRegistryGroup); + final FlowComparator upstreamComparator = new StandardFlowComparator(currentFlow, targetFlow, ancestorServiceIds, + new ConciseEvolvingDifferenceDescriptor(), Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.SHALLOW); + final FlowComparison upstreamComparison = upstreamComparator.compare(); + + final Set upstreamDifferences = new HashSet<>(); + for (final FlowDifference difference : upstreamComparison.getDifferences()) { + if (!FlowDifferenceFilters.isEnvironmentalChange(difference, currentRegistryGroup, controllerFacade.getFlowManager(), + FlowDifferenceFilters.buildEnvironmentalChangeContext(upstreamComparison.getDifferences(), controllerFacade.getFlowManager()))) { + upstreamDifferences.add(difference); + } + } + + final RebaseEngine rebaseEngine = new RebaseEngine(); + final RebaseAnalysis analysis = rebaseEngine.analyze(localDifferences, upstreamDifferences, targetRegistryGroup); + + if (!analysis.isRebaseAllowed()) { + throw new IllegalStateException("Rebase is not allowed due to conflicts between local changes and upstream changes"); + } + + if (!expectedAnalysisFingerprint.equals(analysis.getAnalysisFingerprint())) { + throw new IllegalStateException("The rebase analysis has changed since the analysis was performed. Please re-run the rebase analysis."); + } + + rebaseCleanSnapshots.put(processGroupId, targetRegistryGroup); + targetSnapshotContainer.getFlowSnapshot().setFlowContents(analysis.getMergedSnapshot()); + return targetSnapshotContainer; + } + + @Override + public void resetVersionControlSnapshotAfterRebase(final String processGroupId) { + final VersionedProcessGroup cleanSnapshot = rebaseCleanSnapshots.remove(processGroupId); + if (cleanSnapshot == null) { + return; + } + final ProcessGroup processGroup = processGroupDAO.getProcessGroup(processGroupId); + final VersionControlInformation existingVci = processGroup.getVersionControlInformation(); + if (existingVci == null) { + return; + } + final StandardVersionControlInformation updatedVci = new StandardVersionControlInformation.Builder() + .registryId(existingVci.getRegistryIdentifier()) + .registryName(existingVci.getRegistryName()) + .branch(existingVci.getBranch()) + .bucketId(existingVci.getBucketIdentifier()) + .bucketName(existingVci.getBucketName()) + .flowId(existingVci.getFlowIdentifier()) + .flowName(existingVci.getFlowName()) + .flowDescription(existingVci.getFlowDescription()) + .storageLocation(existingVci.getStorageLocation()) + .version(existingVci.getVersion()) + .status(existingVci.getStatus()) + .flowSnapshot(cleanSnapshot) + .build(); + processGroup.setVersionControlInformation(updatedVci, Collections.emptyMap()); + } + @Override public RegisteredFlow registerVersionedFlow(final String registryId, final RegisteredFlow flow) { final FlowRegistryClientNode registry = flowRegistryDAO.getFlowRegistryClient(registryId); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index acf2f05102b8..bf814a4fe5df 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -66,6 +66,8 @@ import org.apache.nifi.web.api.entity.CreateActiveRequestEntity; import org.apache.nifi.web.api.entity.Entity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.RebaseAnalysisEntity; +import org.apache.nifi.web.api.entity.RebaseRequestEntity; import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; import org.apache.nifi.web.api.entity.VersionControlComponentMappingEntity; import org.apache.nifi.web.api.entity.VersionControlInformationEntity; @@ -996,6 +998,186 @@ public Response deleteRevertRequest( return deleteFlowUpdateRequest("revert-requests", revertRequestId, disconnectedNodeAcknowledged); } + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("rebase-analysis/process-groups/{id}") + @Operation( + summary = "Gets a Rebase Analysis for a Process Group", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = RebaseAnalysisEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + description = "For a Process Group that is under Version Control, this will perform a rebase analysis by comparing " + + "local modifications against upstream changes between the current version and the specified target version. " + + "The analysis determines whether a rebase is allowed or if there are conflicts.", + security = { + @SecurityRequirement(name = "Read - /process-groups/{uuid}") + } + ) + public Response getRebaseAnalysis( + @Parameter(description = "The process group id.") @PathParam("id") final String processGroupId, + @Parameter(description = "The target version to rebase to.", required = true) @QueryParam("targetVersion") final String targetVersion) { + + if (targetVersion == null) { + throw new IllegalArgumentException("The target version must be specified."); + } + + serviceFacade.authorizeAccess(lookup -> { + final ProcessGroupAuthorizable groupAuthorizable = lookup.getProcessGroup(processGroupId); + authorizeProcessGroup(groupAuthorizable, authorizer, lookup, RequestAction.READ, true, + false, false, false, true); + }); + + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + final RebaseAnalysisEntity entity = serviceFacade.getRebaseAnalysis(processGroupId, targetVersion); + return generateOkResponse(entity).build(); + } + + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Path("rebase-requests/process-groups/{id}") + @Operation( + summary = "Initiate a Rebase Request for a Process Group with the given ID", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = VersionedFlowUpdateRequestEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + description = "For a Process Group that is already under Version Control, this will initiate the action of rebasing " + + "the flow to a different version while preserving compatible local changes. This can be a lengthy " + + "process, as it will stop any Processors and disable any Controller Services necessary to perform the action and then restart them. As a result, " + + "the endpoint will immediately return a VersionedFlowUpdateRequestEntity, and the process of rebasing the flow will occur " + + "asynchronously in the background. The client may then periodically poll the status of the request by issuing a GET request to " + + "/versions/rebase-requests/{requestId}. Once the request is completed, the client is expected to issue a DELETE request to " + + "/versions/rebase-requests/{requestId}. " + NON_GUARANTEED_ENDPOINT, + security = { + @SecurityRequirement(name = "Read - /process-groups/{uuid}"), + @SecurityRequirement(name = "Write - /process-groups/{uuid}"), + @SecurityRequirement(name = "Read - /{component-type}/{uuid} - For all encapsulated components"), + @SecurityRequirement(name = "Write - /{component-type}/{uuid} - For all encapsulated components"), + @SecurityRequirement(name = "Write - if the template contains any restricted components - /restricted-components"), + @SecurityRequirement(name = "Read - /parameter-contexts/{uuid} - For any Parameter Context that is referenced by a Property that is changed, added, or removed") + } + ) + public Response initiateRebase( + @Parameter(description = "The process group id.") @PathParam("id") final String groupId, + @Parameter(description = "The rebase request details.", required = true) final RebaseRequestEntity requestEntity) { + + if (requestEntity == null) { + throw new IllegalArgumentException("Rebase request must be specified."); + } + + final String analysisFingerprint = requestEntity.getAnalysisFingerprint(); + if (analysisFingerprint == null) { + throw new IllegalArgumentException("The analysis fingerprint must be supplied."); + } + + final VersionControlInformationEntity vciEntity = requestEntity.getVersionControlInformationEntity(); + if (vciEntity == null) { + throw new IllegalArgumentException("Version Control Information must be supplied."); + } + + final VersionControlInformationDTO requestVersionControlInfoDto = vciEntity.getVersionControlInformation(); + if (requestVersionControlInfoDto == null) { + throw new IllegalArgumentException("Version Control Information must be supplied."); + } + if (requestVersionControlInfoDto.getGroupId() == null) { + throw new IllegalArgumentException("The Process Group ID must be supplied."); + } + if (!requestVersionControlInfoDto.getGroupId().equals(groupId)) { + throw new IllegalArgumentException("The Process Group ID in the request body does not match the Process Group ID of the requested resource."); + } + if (requestVersionControlInfoDto.getBucketId() == null) { + throw new IllegalArgumentException("The Bucket ID must be supplied."); + } + if (requestVersionControlInfoDto.getFlowId() == null) { + throw new IllegalArgumentException("The Flow ID must be supplied."); + } + if (requestVersionControlInfoDto.getRegistryId() == null) { + throw new IllegalArgumentException("The Registry ID must be supplied."); + } + if (requestVersionControlInfoDto.getVersion() == null) { + throw new IllegalArgumentException("The Version of the flow must be supplied."); + } + + final String targetVersion = requestVersionControlInfoDto.getVersion(); + + return initiateFlowUpdate(groupId, vciEntity, true, "rebase-requests", + "/nifi-api/versions/process-groups/" + groupId, + () -> serviceFacade.getRebasedFlowSnapshot(groupId, targetVersion, analysisFingerprint) + ); + } + + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("rebase-requests/{id}") + @Operation( + summary = "Returns the Rebase Request with the given ID", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = VersionedFlowUpdateRequestEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + description = "Returns the Rebase Request with the given ID. Once a Rebase Request has been created by performing a POST to /versions/rebase-requests/process-groups/{id}, " + + "that request can subsequently be retrieved via this endpoint, and the request that is fetched will contain the updated state, such as percent complete, the " + + "current state of the request, and any failures. " + + NON_GUARANTEED_ENDPOINT, + security = { + @SecurityRequirement(name = "Only the user that submitted the request can get it") + } + ) + public Response getRebaseRequest(@Parameter(description = "The ID of the Rebase Request") @PathParam("id") final String rebaseRequestId) { + return retrieveFlowUpdateRequest("rebase-requests", rebaseRequestId); + } + + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("rebase-requests/{id}") + @Operation( + summary = "Deletes the Rebase Request with the given ID", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = VersionedFlowUpdateRequestEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + description = "Deletes the Rebase Request with the given ID. After a request is created via a POST to /versions/rebase-requests/process-groups/{id}, it is expected " + + "that the client will properly clean up the request by DELETE'ing it, once the Rebase process has completed. If the request is deleted before the request " + + "completes, then the Rebase request will finish the step that it is currently performing and then will cancel any subsequent steps. " + + NON_GUARANTEED_ENDPOINT, + security = { + @SecurityRequirement(name = "Only the user that submitted the request can remove it") + } + ) + public Response deleteRebaseRequest( + @Parameter( + description = "Acknowledges that this node is disconnected to allow for mutable requests to proceed." + ) + @QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false") final Boolean disconnectedNodeAcknowledged, + @Parameter(description = "The ID of the Rebase Request") @PathParam("id") final String rebaseRequestId) { + + return deleteFlowUpdateRequest("rebase-requests", rebaseRequestId, disconnectedNodeAcknowledged.booleanValue()); + } + @POST @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) @@ -1237,8 +1419,12 @@ protected ProcessGroupEntity performUpdateFlow(final String groupId, final Revis versionControlInfo.setVersion(metadata.getVersion()); versionControlInfo.setState(flowSnapshot.isLatest() ? VersionedFlowState.UP_TO_DATE.name() : VersionedFlowState.STALE.name()); - return serviceFacade.updateProcessGroupContents(revision, groupId, versionControlInfo, flowSnapshot, idGenerationSeed, + final ProcessGroupEntity result = serviceFacade.updateProcessGroupContents(revision, groupId, versionControlInfo, flowSnapshot, idGenerationSeed, verifyNotModified, false, updateDescendantVersionedFlows); + + serviceFacade.resetVersionControlSnapshotAfterRebase(groupId); + + return result; } /** diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 6954dc6607a9..63a2a0f61b21 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -159,6 +159,7 @@ import org.apache.nifi.registry.flow.VersionedFlowStatus; import org.apache.nifi.registry.flow.diff.FlowComparison; import org.apache.nifi.registry.flow.diff.FlowDifference; +import org.apache.nifi.registry.flow.diff.RebaseAnalysis; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection; import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService; @@ -251,6 +252,7 @@ import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity; +import org.apache.nifi.web.api.entity.RebaseAnalysisEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity; import org.apache.nifi.web.api.entity.TenantEntity; @@ -2926,6 +2928,78 @@ private ComponentDifferenceDTO createComponentDifference(final FlowDifference di return dto; } + public RebaseAnalysisEntity createRebaseAnalysisEntity(final RebaseAnalysis analysis, final String processGroupId, + final String currentVersion, final String targetVersion, + final Set upstreamDifferences, + final String failureReasonOverride) { + final RebaseAnalysisEntity entity = new RebaseAnalysisEntity(); + entity.setProcessGroupId(processGroupId); + entity.setCurrentVersion(currentVersion); + entity.setTargetVersion(targetVersion); + entity.setRebaseAllowed(analysis.isRebaseAllowed()); + entity.setAnalysisFingerprint(analysis.getAnalysisFingerprint()); + + final List localChangeDtos = new ArrayList<>(); + for (final RebaseAnalysis.ClassifiedDifference classified : analysis.getClassifiedLocalChanges()) { + localChangeDtos.add(createRebaseChangeDto(classified)); + } + entity.setLocalChanges(localChangeDtos); + + final Set upstreamChangeDtos = new HashSet<>(); + final Map> differencesByComponent = new HashMap<>(); + for (final FlowDifference difference : upstreamDifferences) { + final ComponentDifferenceDTO componentDiff = createComponentDifference(difference); + final List differences = differencesByComponent.computeIfAbsent(componentDiff, key -> new ArrayList<>()); + differences.add(createDifferenceDto(difference)); + } + for (final Map.Entry> entry : differencesByComponent.entrySet()) { + entry.getKey().setDifferences(entry.getValue()); + upstreamChangeDtos.add(entry.getKey()); + } + entity.setUpstreamChanges(upstreamChangeDtos); + + if (!analysis.isRebaseAllowed()) { + if (failureReasonOverride != null) { + entity.setFailureReason(failureReasonOverride); + } else { + final StringBuilder failureReason = new StringBuilder(); + for (final RebaseAnalysis.ClassifiedDifference classified : analysis.getClassifiedLocalChanges()) { + if (classified.getConflictCode() != null) { + if (!failureReason.isEmpty()) { + failureReason.append("; "); + } + failureReason.append(classified.getConflictDetail()); + } + } + entity.setFailureReason(failureReason.toString()); + } + } + + return entity; + } + + private RebaseChangeDTO createRebaseChangeDto(final RebaseAnalysis.ClassifiedDifference classified) { + final FlowDifference difference = classified.getDifference(); + final RebaseChangeDTO dto = new RebaseChangeDTO(); + + final VersionedComponent component = difference.getComponentB() != null ? difference.getComponentB() : difference.getComponentA(); + if (component != null) { + dto.setComponentId(component.getIdentifier()); + dto.setComponentName(component.getName()); + dto.setComponentType(component.getComponentType().toString()); + } + + dto.setDifferenceType(difference.getDifferenceType().getDescription()); + dto.setFieldName(difference.getFieldName().orElse(null)); + dto.setLocalValue(difference.getValueB() == null ? null : difference.getValueB().toString()); + dto.setRegistryValue(difference.getValueA() == null ? null : difference.getValueA().toString()); + dto.setClassification(classified.getClassification().name()); + dto.setConflictCode(classified.getConflictCode()); + dto.setConflictDetail(classified.getConflictDetail()); + + return dto; + } + public FlowRegistryBranchDTO createBranchDTO(final FlowRegistryBranch branch) { final FlowRegistryBranchDTO branchDTO = new FlowRegistryBranchDTO(); branchDTO.setName(branch.getName()); diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/pom.xml b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/pom.xml index 2a9273cc8160..fbc8d4d09a8a 100644 --- a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/pom.xml +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/pom.xml @@ -29,5 +29,9 @@ org.apache.nifi nifi-api + + com.fasterxml.jackson.core + jackson-databind + diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/BendpointsChangedRebaseHandler.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/BendpointsChangedRebaseHandler.java new file mode 100644 index 000000000000..7f0d106ebbd4 --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/BendpointsChangedRebaseHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +public class BendpointsChangedRebaseHandler implements RebaseHandler { + + private static final Logger logger = LoggerFactory.getLogger(BendpointsChangedRebaseHandler.class); + + @Override + public DifferenceType getSupportedType() { + return DifferenceType.BENDPOINTS_CHANGED; + } + + @Override + public RebaseAnalysis.ClassifiedDifference classify(final FlowDifference localDifference, final Set upstreamDifferences, + final VersionedProcessGroup targetSnapshot) { + return RebaseAnalysis.ClassifiedDifference.compatible(localDifference); + } + + @Override + public void apply(final FlowDifference localDifference, final VersionedProcessGroup mergedFlow) { + final String componentIdentifier = localDifference.getComponentB().getIdentifier(); + final VersionedConnection connection = RebaseHandlerUtils.findConnectionById(mergedFlow, componentIdentifier); + if (connection == null) { + logger.warn("Unable to apply bendpoint change: connection [{}] not found in merged flow", componentIdentifier); + return; + } + final VersionedConnection localConnection = (VersionedConnection) localDifference.getComponentB(); + connection.setBends(localConnection.getBends()); + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/CommentsChangedRebaseHandler.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/CommentsChangedRebaseHandler.java new file mode 100644 index 000000000000..a109e54daa3a --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/CommentsChangedRebaseHandler.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import org.apache.nifi.flow.VersionedComponent; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +public class CommentsChangedRebaseHandler implements RebaseHandler { + + private static final Logger logger = LoggerFactory.getLogger(CommentsChangedRebaseHandler.class); + + @Override + public DifferenceType getSupportedType() { + return DifferenceType.COMMENTS_CHANGED; + } + + @Override + public RebaseAnalysis.ClassifiedDifference classify(final FlowDifference localDifference, final Set upstreamDifferences, + final VersionedProcessGroup targetSnapshot) { + final String componentIdentifier = localDifference.getComponentB().getIdentifier(); + for (final FlowDifference upstreamDifference : upstreamDifferences) { + if (upstreamDifference.getDifferenceType() == DifferenceType.COMMENTS_CHANGED + && componentIdentifier.equals(upstreamDifference.getComponentA().getIdentifier())) { + return RebaseAnalysis.ClassifiedDifference.conflicting(localDifference, "SAME_COMPONENT_COMMENTS", + "Both local and upstream flows modified comments on component " + componentIdentifier); + } + } + return RebaseAnalysis.ClassifiedDifference.compatible(localDifference); + } + + @Override + public void apply(final FlowDifference localDifference, final VersionedProcessGroup mergedFlow) { + final String componentIdentifier = localDifference.getComponentB().getIdentifier(); + final VersionedComponent component = RebaseHandlerUtils.findComponentById(mergedFlow, componentIdentifier); + if (component == null) { + logger.warn("Unable to apply comments change: component [{}] not found in merged flow", componentIdentifier); + return; + } + component.setComments((String) localDifference.getValueB()); + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PositionChangedRebaseHandler.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PositionChangedRebaseHandler.java new file mode 100644 index 000000000000..d95900d52668 --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PositionChangedRebaseHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import org.apache.nifi.flow.Position; +import org.apache.nifi.flow.VersionedComponent; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +public class PositionChangedRebaseHandler implements RebaseHandler { + + private static final Logger logger = LoggerFactory.getLogger(PositionChangedRebaseHandler.class); + + @Override + public DifferenceType getSupportedType() { + return DifferenceType.POSITION_CHANGED; + } + + @Override + public RebaseAnalysis.ClassifiedDifference classify(final FlowDifference localDifference, final Set upstreamDifferences, + final VersionedProcessGroup targetSnapshot) { + return RebaseAnalysis.ClassifiedDifference.compatible(localDifference); + } + + @Override + public void apply(final FlowDifference localDifference, final VersionedProcessGroup mergedFlow) { + final String componentIdentifier = localDifference.getComponentB().getIdentifier(); + final VersionedComponent component = RebaseHandlerUtils.findComponentById(mergedFlow, componentIdentifier); + if (component == null) { + logger.warn("Unable to apply position change: component [{}] not found in merged flow", componentIdentifier); + return; + } + component.setPosition((Position) localDifference.getValueB()); + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PropertyAddedRebaseHandler.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PropertyAddedRebaseHandler.java new file mode 100644 index 000000000000..049a8388dab0 --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PropertyAddedRebaseHandler.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import org.apache.nifi.flow.VersionedConfigurableComponent; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedPropertyDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class PropertyAddedRebaseHandler implements RebaseHandler { + + private static final Logger logger = LoggerFactory.getLogger(PropertyAddedRebaseHandler.class); + + @Override + public DifferenceType getSupportedType() { + return DifferenceType.PROPERTY_ADDED; + } + + @Override + public RebaseAnalysis.ClassifiedDifference classify(final FlowDifference localDifference, final Set upstreamDifferences, + final VersionedProcessGroup targetSnapshot) { + final String propertyName = localDifference.getFieldName().orElse(null); + if (propertyName == null) { + return RebaseAnalysis.ClassifiedDifference.unsupported(localDifference, "MISSING_FIELD_NAME", + "Property added difference does not specify a field name"); + } + + final String componentIdentifier = localDifference.getComponentB().getIdentifier(); + + for (final FlowDifference upstreamDifference : upstreamDifferences) { + if ((upstreamDifference.getDifferenceType() == DifferenceType.PROPERTY_ADDED + || upstreamDifference.getDifferenceType() == DifferenceType.PROPERTY_CHANGED) + && componentIdentifier.equals(upstreamDifference.getComponentA().getIdentifier()) + && upstreamDifference.getFieldName().isPresent() + && propertyName.equals(upstreamDifference.getFieldName().get())) { + if (Objects.equals(localDifference.getValueB(), upstreamDifference.getValueB())) { + return RebaseAnalysis.ClassifiedDifference.compatible(localDifference); + } + return RebaseAnalysis.ClassifiedDifference.conflicting(localDifference, "SAME_PROPERTY", + "Both local and upstream flows set property '%s' on component %s".formatted(propertyName, componentIdentifier)); + } + } + + final VersionedConfigurableComponent targetComponent = RebaseHandlerUtils.findConfigurableComponentById(targetSnapshot, componentIdentifier); + if (targetComponent == null) { + return RebaseAnalysis.ClassifiedDifference.unsupported(localDifference, "COMPONENT_NOT_FOUND", + "Component %s not found in target snapshot".formatted(componentIdentifier)); + } + + final Map descriptors = targetComponent.getPropertyDescriptors(); + if (descriptors == null || !descriptors.containsKey(propertyName)) { + return RebaseAnalysis.ClassifiedDifference.unsupported(localDifference, "DESCRIPTOR_NOT_FOUND", + "Property descriptor '%s' does not exist on component %s in target version".formatted(propertyName, componentIdentifier)); + } + + return RebaseAnalysis.ClassifiedDifference.compatible(localDifference); + } + + @Override + public void apply(final FlowDifference localDifference, final VersionedProcessGroup mergedFlow) { + final String propertyName = localDifference.getFieldName().orElse(null); + if (propertyName == null) { + return; + } + + final String componentIdentifier = localDifference.getComponentB().getIdentifier(); + final VersionedConfigurableComponent component = RebaseHandlerUtils.findConfigurableComponentById(mergedFlow, componentIdentifier); + if (component == null) { + logger.warn("Unable to apply property addition: configurable component [{}] not found in merged flow", componentIdentifier); + return; + } + + final Map existingProperties = component.getProperties(); + final Map updatedProperties = existingProperties != null ? new HashMap<>(existingProperties) : new HashMap<>(); + updatedProperties.put(propertyName, (String) localDifference.getValueB()); + component.setProperties(updatedProperties); + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PropertyChangedRebaseHandler.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PropertyChangedRebaseHandler.java new file mode 100644 index 000000000000..a9709cfdb45a --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/PropertyChangedRebaseHandler.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import org.apache.nifi.flow.VersionedConfigurableComponent; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedPropertyDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class PropertyChangedRebaseHandler implements RebaseHandler { + + private static final Logger logger = LoggerFactory.getLogger(PropertyChangedRebaseHandler.class); + + @Override + public DifferenceType getSupportedType() { + return DifferenceType.PROPERTY_CHANGED; + } + + @Override + public RebaseAnalysis.ClassifiedDifference classify(final FlowDifference localDifference, final Set upstreamDifferences, + final VersionedProcessGroup targetSnapshot) { + final String propertyName = localDifference.getFieldName().orElse(null); + if (propertyName == null) { + return RebaseAnalysis.ClassifiedDifference.unsupported(localDifference, "MISSING_FIELD_NAME", + "Property change difference does not specify a field name"); + } + + final String componentIdentifier = localDifference.getComponentB().getIdentifier(); + + for (final FlowDifference upstreamDifference : upstreamDifferences) { + if (upstreamDifference.getDifferenceType() == DifferenceType.PROPERTY_CHANGED + && componentIdentifier.equals(upstreamDifference.getComponentA().getIdentifier()) + && upstreamDifference.getFieldName().isPresent() + && propertyName.equals(upstreamDifference.getFieldName().get())) { + if (Objects.equals(localDifference.getValueB(), upstreamDifference.getValueB())) { + return RebaseAnalysis.ClassifiedDifference.compatible(localDifference); + } + return RebaseAnalysis.ClassifiedDifference.conflicting(localDifference, "SAME_PROPERTY", + "Both local and upstream flows modified property '%s' on component %s".formatted(propertyName, componentIdentifier)); + } + } + + if (localDifference.getValueA() == null) { + final VersionedConfigurableComponent targetComponent = RebaseHandlerUtils.findConfigurableComponentById(targetSnapshot, componentIdentifier); + if (targetComponent == null) { + return RebaseAnalysis.ClassifiedDifference.unsupported(localDifference, "COMPONENT_NOT_FOUND", + "Component %s not found in target snapshot".formatted(componentIdentifier)); + } + final Map descriptors = targetComponent.getPropertyDescriptors(); + if (descriptors == null || !descriptors.containsKey(propertyName)) { + return RebaseAnalysis.ClassifiedDifference.unsupported(localDifference, "DESCRIPTOR_CHANGED", + "Property descriptor '%s' no longer exists on component %s".formatted(propertyName, componentIdentifier)); + } + final VersionedPropertyDescriptor descriptor = descriptors.get(propertyName); + if (!descriptor.isSensitive()) { + return RebaseAnalysis.ClassifiedDifference.unsupported(localDifference, "DESCRIPTOR_CHANGED", + "Property '%s' on component %s is no longer sensitive".formatted(propertyName, componentIdentifier)); + } + } + + return RebaseAnalysis.ClassifiedDifference.compatible(localDifference); + } + + @Override + public void apply(final FlowDifference localDifference, final VersionedProcessGroup mergedFlow) { + final String propertyName = localDifference.getFieldName().orElse(null); + if (propertyName == null) { + return; + } + + final String componentIdentifier = localDifference.getComponentB().getIdentifier(); + final VersionedConfigurableComponent component = RebaseHandlerUtils.findConfigurableComponentById(mergedFlow, componentIdentifier); + if (component == null) { + logger.warn("Unable to apply property change: configurable component [{}] not found in merged flow", componentIdentifier); + return; + } + + final Map existingProperties = component.getProperties(); + final Map updatedProperties = existingProperties != null ? new HashMap<>(existingProperties) : new HashMap<>(); + updatedProperties.put(propertyName, (String) localDifference.getValueB()); + component.setProperties(updatedProperties); + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseAnalysis.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseAnalysis.java new file mode 100644 index 000000000000..4ddbb1f6bb65 --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseAnalysis.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import org.apache.nifi.flow.VersionedProcessGroup; + +import java.util.List; +import java.util.Objects; +import java.util.Set; + +public class RebaseAnalysis { + + private final List classifiedLocalChanges; + private final Set upstreamDifferences; + private final boolean rebaseAllowed; + private final String analysisFingerprint; + private final VersionedProcessGroup mergedSnapshot; + + public RebaseAnalysis(final List classifiedLocalChanges, final Set upstreamDifferences, + final boolean rebaseAllowed, final String analysisFingerprint, final VersionedProcessGroup mergedSnapshot) { + this.classifiedLocalChanges = Objects.requireNonNull(classifiedLocalChanges, "Classified local changes are required"); + this.upstreamDifferences = Objects.requireNonNull(upstreamDifferences, "Upstream differences are required"); + this.rebaseAllowed = rebaseAllowed; + this.analysisFingerprint = Objects.requireNonNull(analysisFingerprint, "Analysis fingerprint is required"); + this.mergedSnapshot = mergedSnapshot; + } + + public List getClassifiedLocalChanges() { + return classifiedLocalChanges; + } + + public Set getUpstreamDifferences() { + return upstreamDifferences; + } + + public boolean isRebaseAllowed() { + return rebaseAllowed; + } + + public String getAnalysisFingerprint() { + return analysisFingerprint; + } + + public VersionedProcessGroup getMergedSnapshot() { + return mergedSnapshot; + } + + public static class ClassifiedDifference { + private final FlowDifference difference; + private final RebaseClassification classification; + private final String conflictCode; + private final String conflictDetail; + + public ClassifiedDifference(final FlowDifference difference, final RebaseClassification classification, + final String conflictCode, final String conflictDetail) { + this.difference = Objects.requireNonNull(difference, "Difference is required"); + this.classification = Objects.requireNonNull(classification, "Classification is required"); + this.conflictCode = conflictCode; + this.conflictDetail = conflictDetail; + } + + public static ClassifiedDifference compatible(final FlowDifference difference) { + return new ClassifiedDifference(difference, RebaseClassification.COMPATIBLE, null, null); + } + + public static ClassifiedDifference conflicting(final FlowDifference difference, final String conflictCode, final String conflictDetail) { + return new ClassifiedDifference(difference, RebaseClassification.CONFLICTING, conflictCode, conflictDetail); + } + + public static ClassifiedDifference unsupported(final FlowDifference difference, final String conflictCode, final String conflictDetail) { + return new ClassifiedDifference(difference, RebaseClassification.UNSUPPORTED, conflictCode, conflictDetail); + } + + public FlowDifference getDifference() { + return difference; + } + + public RebaseClassification getClassification() { + return classification; + } + + public String getConflictCode() { + return conflictCode; + } + + public String getConflictDetail() { + return conflictDetail; + } + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseClassification.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseClassification.java new file mode 100644 index 000000000000..fe21694d62c9 --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseClassification.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +public enum RebaseClassification { + + COMPATIBLE("Compatible"), + + CONFLICTING("Conflicting"), + + UNSUPPORTED("Unsupported"); + + private final String description; + + RebaseClassification(final String description) { + this.description = description; + } + + public String getDescription() { + return description; + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseEngine.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseEngine.java new file mode 100644 index 000000000000..0a5b445b62f7 --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseEngine.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.flow.VersionedProcessGroup; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HexFormat; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +public class RebaseEngine { + + private static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); + + private final Map handlerRegistry; + + public RebaseEngine() { + this.handlerRegistry = new HashMap<>(); + registerHandler(new PositionChangedRebaseHandler()); + registerHandler(new SizeChangedRebaseHandler()); + registerHandler(new BendpointsChangedRebaseHandler()); + registerHandler(new PropertyChangedRebaseHandler()); + registerHandler(new PropertyAddedRebaseHandler()); + registerHandler(new CommentsChangedRebaseHandler()); + } + + public RebaseEngine(final Map handlerRegistry) { + this.handlerRegistry = new HashMap<>(handlerRegistry); + } + + public RebaseAnalysis analyze(final Set localDifferences, final Set upstreamDifferences, + final VersionedProcessGroup targetSnapshot) { + final List classifiedChanges = new ArrayList<>(); + boolean allCompatible = true; + + for (final FlowDifference localDifference : localDifferences) { + final RebaseHandler handler = handlerRegistry.get(localDifference.getDifferenceType()); + if (handler == null) { + classifiedChanges.add(RebaseAnalysis.ClassifiedDifference.unsupported(localDifference, "NO_HANDLER", + "No rebase handler registered for difference type: " + localDifference.getDifferenceType().getDescription())); + allCompatible = false; + continue; + } + + final RebaseAnalysis.ClassifiedDifference classified = handler.classify(localDifference, upstreamDifferences, targetSnapshot); + classifiedChanges.add(classified); + if (classified.getClassification() != RebaseClassification.COMPATIBLE) { + allCompatible = false; + } + } + + VersionedProcessGroup mergedSnapshot = null; + if (allCompatible) { + mergedSnapshot = deepClone(targetSnapshot); + for (final RebaseAnalysis.ClassifiedDifference classified : classifiedChanges) { + final RebaseHandler handler = handlerRegistry.get(classified.getDifference().getDifferenceType()); + if (handler != null) { + handler.apply(classified.getDifference(), mergedSnapshot); + } + } + } + + final String fingerprint = computeAnalysisFingerprint(classifiedChanges, upstreamDifferences); + return new RebaseAnalysis(classifiedChanges, upstreamDifferences, allCompatible, fingerprint, mergedSnapshot); + } + + public static String computeConflictKey(final FlowDifference difference) { + final DifferenceType type = difference.getDifferenceType(); + final String componentId = resolveComponentIdentifier(difference); + final Optional fieldName = difference.getFieldName(); + + return switch (type) { + case PROPERTY_CHANGED -> type.name() + ":" + componentId + ":" + fieldName.orElse(""); + case POSITION_CHANGED, SIZE_CHANGED, COMMENTS_CHANGED, DESCRIPTION_CHANGED -> type.name() + ":" + componentId; + case BENDPOINTS_CHANGED -> type.name() + ":" + componentId; + default -> type.name() + ":" + componentId + ":" + fieldName.orElse(""); + }; + } + + VersionedProcessGroup deepClone(final VersionedProcessGroup source) { + try { + final byte[] serialized = OBJECT_MAPPER.writeValueAsBytes(source); + return OBJECT_MAPPER.readValue(serialized, VersionedProcessGroup.class); + } catch (final Exception e) { + throw new RuntimeException("Failed to deep clone VersionedProcessGroup", e); + } + } + + private static String resolveComponentIdentifier(final FlowDifference difference) { + if (difference.getComponentB() != null) { + return difference.getComponentB().getIdentifier(); + } + if (difference.getComponentA() != null) { + return difference.getComponentA().getIdentifier(); + } + return ""; + } + + private String computeAnalysisFingerprint(final List classifiedChanges, + final Set upstreamDifferences) { + final SortedSet sortedKeys = new TreeSet<>(); + + for (final RebaseAnalysis.ClassifiedDifference classified : classifiedChanges) { + final String key = computeConflictKey(classified.getDifference()); + sortedKeys.add("local:" + key + ":" + classified.getClassification().name()); + } + + for (final FlowDifference upstream : upstreamDifferences) { + final String key = computeConflictKey(upstream); + sortedKeys.add("upstream:" + key); + } + + try { + final MessageDigest digest = MessageDigest.getInstance("SHA-256"); + for (final String key : sortedKeys) { + digest.update(key.getBytes(StandardCharsets.UTF_8)); + } + return HexFormat.of().formatHex(digest.digest()); + } catch (final NoSuchAlgorithmException e) { + throw new RuntimeException("SHA-256 algorithm not available", e); + } + } + + private void registerHandler(final RebaseHandler handler) { + handlerRegistry.put(handler.getSupportedType(), handler); + } + + private static ObjectMapper createObjectMapper() { + final ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper; + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseHandler.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseHandler.java new file mode 100644 index 000000000000..acb0b3d47233 --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseHandler.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import org.apache.nifi.flow.VersionedProcessGroup; + +import java.util.Set; + +public interface RebaseHandler { + + DifferenceType getSupportedType(); + + RebaseAnalysis.ClassifiedDifference classify(FlowDifference localDifference, Set upstreamDifferences, VersionedProcessGroup targetSnapshot); + + void apply(FlowDifference localDifference, VersionedProcessGroup mergedFlow); +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseHandlerUtils.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseHandlerUtils.java new file mode 100644 index 000000000000..f870ea4b8a86 --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/RebaseHandlerUtils.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import org.apache.nifi.flow.VersionedComponent; +import org.apache.nifi.flow.VersionedConfigurableComponent; +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.flow.VersionedControllerService; +import org.apache.nifi.flow.VersionedFunnel; +import org.apache.nifi.flow.VersionedLabel; +import org.apache.nifi.flow.VersionedPort; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.flow.VersionedRemoteProcessGroup; + +class RebaseHandlerUtils { + + private RebaseHandlerUtils() { + } + + static VersionedComponent findComponentById(final VersionedProcessGroup group, final String identifier) { + if (identifier.equals(group.getIdentifier())) { + return group; + } + + for (final VersionedProcessor processor : group.getProcessors()) { + if (identifier.equals(processor.getIdentifier())) { + return processor; + } + } + for (final VersionedConnection connection : group.getConnections()) { + if (identifier.equals(connection.getIdentifier())) { + return connection; + } + } + for (final VersionedLabel label : group.getLabels()) { + if (identifier.equals(label.getIdentifier())) { + return label; + } + } + for (final VersionedFunnel funnel : group.getFunnels()) { + if (identifier.equals(funnel.getIdentifier())) { + return funnel; + } + } + for (final VersionedPort inputPort : group.getInputPorts()) { + if (identifier.equals(inputPort.getIdentifier())) { + return inputPort; + } + } + for (final VersionedPort outputPort : group.getOutputPorts()) { + if (identifier.equals(outputPort.getIdentifier())) { + return outputPort; + } + } + for (final VersionedControllerService service : group.getControllerServices()) { + if (identifier.equals(service.getIdentifier())) { + return service; + } + } + for (final VersionedRemoteProcessGroup remoteProcessGroup : group.getRemoteProcessGroups()) { + if (identifier.equals(remoteProcessGroup.getIdentifier())) { + return remoteProcessGroup; + } + } + + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + final VersionedComponent result = findComponentById(childGroup, identifier); + if (result != null) { + return result; + } + } + + return null; + } + + static VersionedConnection findConnectionById(final VersionedProcessGroup group, final String identifier) { + for (final VersionedConnection connection : group.getConnections()) { + if (identifier.equals(connection.getIdentifier())) { + return connection; + } + } + + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + final VersionedConnection result = findConnectionById(childGroup, identifier); + if (result != null) { + return result; + } + } + + return null; + } + + static VersionedConfigurableComponent findConfigurableComponentById(final VersionedProcessGroup group, final String identifier) { + for (final VersionedProcessor processor : group.getProcessors()) { + if (identifier.equals(processor.getIdentifier())) { + return processor; + } + } + for (final VersionedControllerService service : group.getControllerServices()) { + if (identifier.equals(service.getIdentifier())) { + return service; + } + } + + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + final VersionedConfigurableComponent result = findConfigurableComponentById(childGroup, identifier); + if (result != null) { + return result; + } + } + + return null; + } + +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/SizeChangedRebaseHandler.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/SizeChangedRebaseHandler.java new file mode 100644 index 000000000000..c58cbf378ae3 --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/SizeChangedRebaseHandler.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import org.apache.nifi.flow.VersionedComponent; +import org.apache.nifi.flow.VersionedLabel; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +public class SizeChangedRebaseHandler implements RebaseHandler { + + private static final Logger logger = LoggerFactory.getLogger(SizeChangedRebaseHandler.class); + + @Override + public DifferenceType getSupportedType() { + return DifferenceType.SIZE_CHANGED; + } + + @Override + public RebaseAnalysis.ClassifiedDifference classify(final FlowDifference localDifference, final Set upstreamDifferences, + final VersionedProcessGroup targetSnapshot) { + return RebaseAnalysis.ClassifiedDifference.compatible(localDifference); + } + + @Override + public void apply(final FlowDifference localDifference, final VersionedProcessGroup mergedFlow) { + final String componentIdentifier = localDifference.getComponentB().getIdentifier(); + final VersionedComponent component = RebaseHandlerUtils.findComponentById(mergedFlow, componentIdentifier); + if (component == null) { + logger.warn("Unable to apply size change: component [{}] not found in merged flow", componentIdentifier); + return; + } + + if (component instanceof VersionedLabel) { + final VersionedLabel targetLabel = (VersionedLabel) component; + final VersionedLabel localLabel = (VersionedLabel) localDifference.getComponentB(); + targetLabel.setWidth(localLabel.getWidth()); + targetLabel.setHeight(localLabel.getHeight()); + } else { + logger.warn("Unable to apply size change: component [{}] is not a VersionedLabel", componentIdentifier); + } + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestPropertyChangedRebaseHandler.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestPropertyChangedRebaseHandler.java new file mode 100644 index 000000000000..c78e7d661f6c --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestPropertyChangedRebaseHandler.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.flow.VersionedPropertyDescriptor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class TestPropertyChangedRebaseHandler { + + private PropertyChangedRebaseHandler handler; + + @BeforeEach + public void setup() { + handler = new PropertyChangedRebaseHandler(); + } + + @Test + public void testNoUpstreamConflict() { + final VersionedProcessor processor = createProcessorWithProperty("proc-a", "propX", "oldVal"); + final VersionedProcessor localProcessor = createProcessorWithProperty("proc-a", "propX", "newVal"); + + final FlowDifference localDifference = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, localProcessor, "propX", + "oldVal", "newVal", "Property propX changed"); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessorWithProperty("proc-a", "propX", "oldVal")); + + final RebaseAnalysis.ClassifiedDifference result = handler.classify(localDifference, Collections.emptySet(), targetSnapshot); + + assertEquals(RebaseClassification.COMPATIBLE, result.getClassification()); + } + + @Test + public void testDifferentPropertyOnSameComponent() { + final VersionedProcessor processor = createProcessorWithProperty("proc-a", "propX", "oldVal"); + final VersionedProcessor localProcessor = createProcessorWithProperty("proc-a", "propX", "newVal"); + + final FlowDifference localDifference = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, localProcessor, "propX", + "oldVal", "newVal", "Property propX changed locally"); + + final Set upstreamDifferences = new HashSet<>(); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, processor, "propY", + "oldY", "newY", "Property propY changed upstream")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessorWithProperty("proc-a", "propX", "oldVal")); + + final RebaseAnalysis.ClassifiedDifference result = handler.classify(localDifference, upstreamDifferences, targetSnapshot); + + assertEquals(RebaseClassification.COMPATIBLE, result.getClassification()); + } + + @Test + public void testSamePropertyOnSameComponent() { + final VersionedProcessor processor = createProcessorWithProperty("proc-a", "propX", "original"); + final VersionedProcessor localProcessor = createProcessorWithProperty("proc-a", "propX", "localVal"); + + final FlowDifference localDifference = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, localProcessor, "propX", + "original", "localVal", "Property propX changed locally"); + + final Set upstreamDifferences = new HashSet<>(); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, processor, "propX", + "original", "upstreamVal", "Property propX changed upstream")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + + final RebaseAnalysis.ClassifiedDifference result = handler.classify(localDifference, upstreamDifferences, targetSnapshot); + + assertEquals(RebaseClassification.CONFLICTING, result.getClassification()); + assertEquals("SAME_PROPERTY", result.getConflictCode()); + } + + @Test + public void testSamePropertyOnDifferentComponent() { + final VersionedProcessor processorA = createProcessorWithProperty("proc-a", "propX", "oldVal"); + final VersionedProcessor localProcessorA = createProcessorWithProperty("proc-a", "propX", "newVal"); + + final VersionedProcessor processorB = createProcessorWithProperty("proc-b", "propX", "oldVal"); + + final FlowDifference localDifference = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processorA, localProcessorA, "propX", + "oldVal", "newVal", "Property propX changed on proc-a"); + + final Set upstreamDifferences = new HashSet<>(); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processorB, processorB, "propX", + "oldVal", "upstreamVal", "Property propX changed on proc-b")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessorWithProperty("proc-a", "propX", "oldVal")); + + final RebaseAnalysis.ClassifiedDifference result = handler.classify(localDifference, upstreamDifferences, targetSnapshot); + + assertEquals(RebaseClassification.COMPATIBLE, result.getClassification()); + } + + @Test + public void testSensitivePropertyNullRegistryValueWithDescriptorPresent() { + final VersionedProcessor processor = createProcessorWithSensitiveProperty("proc-a", "secretProp", true); + final VersionedProcessor localProcessor = createProcessorWithSensitiveProperty("proc-a", "secretProp", true); + + final FlowDifference localDifference = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, localProcessor, "secretProp", + null, "newSecret", "Sensitive property secretProp changed"); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessorWithSensitiveProperty("proc-a", "secretProp", true)); + + final RebaseAnalysis.ClassifiedDifference result = handler.classify(localDifference, Collections.emptySet(), targetSnapshot); + + assertEquals(RebaseClassification.COMPATIBLE, result.getClassification()); + } + + @Test + public void testSensitivePropertyDescriptorRemovedInTarget() { + final VersionedProcessor processor = createProcessorWithSensitiveProperty("proc-a", "secretProp", true); + final VersionedProcessor localProcessor = createProcessorWithSensitiveProperty("proc-a", "secretProp", true); + + final FlowDifference localDifference = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, localProcessor, "secretProp", + null, "newSecret", "Sensitive property secretProp changed"); + + final VersionedProcessor targetProcessor = new VersionedProcessor(); + targetProcessor.setIdentifier("proc-a"); + targetProcessor.setProperties(Collections.emptyMap()); + targetProcessor.setPropertyDescriptors(Collections.emptyMap()); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(targetProcessor); + + final RebaseAnalysis.ClassifiedDifference result = handler.classify(localDifference, Collections.emptySet(), targetSnapshot); + + assertEquals(RebaseClassification.UNSUPPORTED, result.getClassification()); + assertEquals("DESCRIPTOR_CHANGED", result.getConflictCode()); + } + + @Test + public void testApplySetsPropertyValueCorrectly() { + final VersionedProcessor processor = createProcessorWithProperty("proc-a", "propX", "oldVal"); + final VersionedProcessor localProcessor = createProcessorWithProperty("proc-a", "propX", "newVal"); + + final FlowDifference localDifference = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, localProcessor, "propX", + "oldVal", "newVal", "Property propX changed"); + + final VersionedProcessGroup mergedFlow = new VersionedProcessGroup(); + mergedFlow.setIdentifier("root"); + mergedFlow.getProcessors().add(createProcessorWithProperty("proc-a", "propX", "oldVal")); + + handler.apply(localDifference, mergedFlow); + + final VersionedProcessor mergedProcessor = mergedFlow.getProcessors().iterator().next(); + assertNotNull(mergedProcessor); + assertEquals("newVal", mergedProcessor.getProperties().get("propX")); + } + + @Test + public void testMultipleUpstreamChangesNoneMatching() { + final VersionedProcessor processorA = createProcessorWithProperty("proc-a", "propX", "oldVal"); + final VersionedProcessor localProcessorA = createProcessorWithProperty("proc-a", "propX", "newVal"); + + final VersionedProcessor processorB = createProcessorWithProperty("proc-b", "propY", "oldY"); + final VersionedProcessor processorC = createProcessorWithProperty("proc-c", "propZ", "oldZ"); + + final FlowDifference localDifference = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processorA, localProcessorA, "propX", + "oldVal", "newVal", "Property propX changed on proc-a"); + + final Set upstreamDifferences = new HashSet<>(); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processorB, processorB, "propY", + "oldY", "newY", "Property propY changed on proc-b")); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processorC, processorC, "propZ", + "oldZ", "newZ", "Property propZ changed on proc-c")); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.POSITION_CHANGED, + processorA, processorA, null, null, "Position changed on proc-a")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessorWithProperty("proc-a", "propX", "oldVal")); + + final RebaseAnalysis.ClassifiedDifference result = handler.classify(localDifference, upstreamDifferences, targetSnapshot); + + assertEquals(RebaseClassification.COMPATIBLE, result.getClassification()); + } + + private VersionedProcessor createProcessorWithProperty(final String identifier, final String propertyName, final String propertyValue) { + final VersionedProcessor processor = new VersionedProcessor(); + processor.setIdentifier(identifier); + + final Map properties = new HashMap<>(); + properties.put(propertyName, propertyValue); + processor.setProperties(properties); + + final VersionedPropertyDescriptor descriptor = new VersionedPropertyDescriptor(); + descriptor.setName(propertyName); + descriptor.setSensitive(false); + + final Map descriptors = new HashMap<>(); + descriptors.put(propertyName, descriptor); + processor.setPropertyDescriptors(descriptors); + + return processor; + } + + private VersionedProcessor createProcessorWithSensitiveProperty(final String identifier, final String propertyName, final boolean sensitive) { + final VersionedProcessor processor = new VersionedProcessor(); + processor.setIdentifier(identifier); + + final Map properties = new HashMap<>(); + properties.put(propertyName, null); + processor.setProperties(properties); + + final VersionedPropertyDescriptor descriptor = new VersionedPropertyDescriptor(); + descriptor.setName(propertyName); + descriptor.setSensitive(sensitive); + + final Map descriptors = new HashMap<>(); + descriptors.put(propertyName, descriptor); + processor.setPropertyDescriptors(descriptors); + + return processor; + } +} diff --git a/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestRebaseEngine.java b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestRebaseEngine.java new file mode 100644 index 000000000000..cd21a4bae0a8 --- /dev/null +++ b/nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestRebaseEngine.java @@ -0,0 +1,609 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.flow.diff; + +import org.apache.nifi.flow.Position; +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.flow.VersionedLabel; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flow.VersionedProcessor; +import org.apache.nifi.flow.VersionedPropertyDescriptor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestRebaseEngine { + + private RebaseEngine engine; + + @BeforeEach + public void setup() { + engine = new RebaseEngine(); + } + + @Test + public void testCompatiblePositionChangeNoUpstreamConflict() { + final VersionedProcessor processorA = createProcessor("proc-a", "ProcessorA"); + processorA.setPosition(new Position(100.0, 200.0)); + + final VersionedProcessor localProcessorA = createProcessor("proc-a", "ProcessorA"); + localProcessorA.setPosition(new Position(300.0, 400.0)); + + final VersionedProcessor upstreamProcessorB = createProcessor("proc-b", "ProcessorB"); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.POSITION_CHANGED, processorA, localProcessorA, + new Position(100.0, 200.0), new Position(300.0, 400.0), "Position changed on ProcessorA")); + + final Set upstreamDifferences = new HashSet<>(); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.COMPONENT_ADDED, null, upstreamProcessorB, + null, upstreamProcessorB, "ProcessorB added upstream")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessor("proc-a", "ProcessorA")); + targetSnapshot.getProcessors().add(createProcessor("proc-b", "ProcessorB")); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, upstreamDifferences, targetSnapshot); + + assertTrue(analysis.isRebaseAllowed()); + assertEquals(1, analysis.getClassifiedLocalChanges().size()); + assertEquals(RebaseClassification.COMPATIBLE, analysis.getClassifiedLocalChanges().get(0).getClassification()); + + final VersionedProcessGroup merged = analysis.getMergedSnapshot(); + assertNotNull(merged); + final VersionedProcessor mergedProcA = findProcessorById(merged, "proc-a"); + assertNotNull(mergedProcA); + assertEquals(300.0, mergedProcA.getPosition().getX()); + assertEquals(400.0, mergedProcA.getPosition().getY()); + } + + @Test + public void testCompatiblePropertyChangeOnDifferentProperties() { + final VersionedProcessor processorA = createProcessorWithProperty("proc-a", "ProcessorA", "propX", "oldValueX"); + final VersionedProcessor localProcessorA = createProcessorWithProperty("proc-a", "ProcessorA", "propX", "newValueX"); + + final VersionedProcessor upstreamProcessorB = createProcessorWithProperty("proc-b", "ProcessorB", "propY", "oldValueY"); + final VersionedProcessor upstreamProcessorBNew = createProcessorWithProperty("proc-b", "ProcessorB", "propY", "newValueY"); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processorA, localProcessorA, "propX", + "oldValueX", "newValueX", "Property propX changed on ProcessorA")); + + final Set upstreamDifferences = new HashSet<>(); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, upstreamProcessorB, upstreamProcessorBNew, "propY", + "oldValueY", "newValueY", "Property propY changed on ProcessorB")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessorWithProperty("proc-a", "ProcessorA", "propX", "oldValueX")); + targetSnapshot.getProcessors().add(createProcessorWithProperty("proc-b", "ProcessorB", "propY", "newValueY")); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, upstreamDifferences, targetSnapshot); + + assertTrue(analysis.isRebaseAllowed()); + assertEquals(1, analysis.getClassifiedLocalChanges().size()); + assertEquals(RebaseClassification.COMPATIBLE, analysis.getClassifiedLocalChanges().get(0).getClassification()); + } + + @Test + public void testConflictingPropertyChangeOnSamePropertyAndComponent() { + final VersionedProcessor processor = createProcessorWithProperty("proc-a", "ProcessorA", "propX", "original"); + final VersionedProcessor localProcessor = createProcessorWithProperty("proc-a", "ProcessorA", "propX", "localValue"); + final VersionedProcessor upstreamProcessor = createProcessorWithProperty("proc-a", "ProcessorA", "propX", "upstreamValue"); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, localProcessor, "propX", + "original", "localValue", "Property propX changed locally")); + + final Set upstreamDifferences = new HashSet<>(); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, upstreamProcessor, "propX", + "original", "upstreamValue", "Property propX changed upstream")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessorWithProperty("proc-a", "ProcessorA", "propX", "upstreamValue")); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, upstreamDifferences, targetSnapshot); + + assertFalse(analysis.isRebaseAllowed()); + assertEquals(1, analysis.getClassifiedLocalChanges().size()); + assertEquals(RebaseClassification.CONFLICTING, analysis.getClassifiedLocalChanges().get(0).getClassification()); + assertNull(analysis.getMergedSnapshot()); + } + + @Test + public void testUnsupportedDifferenceTypeNoHandler() { + final VersionedProcessor processor = createProcessor("proc-a", "ProcessorA"); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.COMPONENT_ADDED, null, processor, + null, processor, "Component added locally")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, Collections.emptySet(), targetSnapshot); + + assertFalse(analysis.isRebaseAllowed()); + assertEquals(1, analysis.getClassifiedLocalChanges().size()); + + final RebaseAnalysis.ClassifiedDifference classified = analysis.getClassifiedLocalChanges().get(0); + assertEquals(RebaseClassification.UNSUPPORTED, classified.getClassification()); + assertEquals("NO_HANDLER", classified.getConflictCode()); + assertNull(analysis.getMergedSnapshot()); + } + + @Test + public void testMixedCompatibleAndUnsupported() { + final VersionedProcessor processorA = createProcessor("proc-a", "ProcessorA"); + processorA.setPosition(new Position(10.0, 20.0)); + final VersionedProcessor localProcessorA = createProcessor("proc-a", "ProcessorA"); + localProcessorA.setPosition(new Position(50.0, 60.0)); + + final VersionedProcessor processorB = createProcessor("proc-b", "ProcessorB"); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.POSITION_CHANGED, processorA, localProcessorA, + new Position(10.0, 20.0), new Position(50.0, 60.0), "Position changed")); + localDifferences.add(new StandardFlowDifference(DifferenceType.COMPONENT_ADDED, null, processorB, + null, processorB, "Component added")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessor("proc-a", "ProcessorA")); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, Collections.emptySet(), targetSnapshot); + + assertFalse(analysis.isRebaseAllowed()); + assertNull(analysis.getMergedSnapshot()); + } + + @Test + public void testMixedCompatibleAndConflicting() { + final VersionedProcessor processorA = createProcessor("proc-a", "ProcessorA"); + processorA.setPosition(new Position(10.0, 20.0)); + final VersionedProcessor localProcessorA = createProcessor("proc-a", "ProcessorA"); + localProcessorA.setPosition(new Position(50.0, 60.0)); + + final VersionedProcessor processorB = createProcessorWithProperty("proc-b", "ProcessorB", "propX", "original"); + final VersionedProcessor localProcessorB = createProcessorWithProperty("proc-b", "ProcessorB", "propX", "localVal"); + final VersionedProcessor upstreamProcessorB = createProcessorWithProperty("proc-b", "ProcessorB", "propX", "upstreamVal"); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.POSITION_CHANGED, processorA, localProcessorA, + new Position(10.0, 20.0), new Position(50.0, 60.0), "Position changed")); + localDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processorB, localProcessorB, "propX", + "original", "localVal", "Property changed locally")); + + final Set upstreamDifferences = new HashSet<>(); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processorB, upstreamProcessorB, "propX", + "original", "upstreamVal", "Property changed upstream")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, upstreamDifferences, targetSnapshot); + + assertFalse(analysis.isRebaseAllowed()); + assertNull(analysis.getMergedSnapshot()); + } + + @Test + public void testAllCompatibleMultipleLocalChanges() { + final VersionedProcessor processorA = createProcessorWithProperty("proc-a", "ProcessorA", "propX", "oldVal"); + processorA.setPosition(new Position(10.0, 20.0)); + processorA.setComments("old comments"); + + final VersionedProcessor localProcessorA = createProcessorWithProperty("proc-a", "ProcessorA", "propX", "newVal"); + localProcessorA.setPosition(new Position(50.0, 60.0)); + localProcessorA.setComments("new comments"); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.POSITION_CHANGED, processorA, localProcessorA, + new Position(10.0, 20.0), new Position(50.0, 60.0), "Position changed")); + localDifferences.add(new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processorA, localProcessorA, "propX", + "oldVal", "newVal", "Property propX changed")); + localDifferences.add(new StandardFlowDifference(DifferenceType.COMMENTS_CHANGED, processorA, localProcessorA, + "old comments", "new comments", "Comments changed")); + + final VersionedProcessor targetProcessor = createProcessorWithProperty("proc-a", "ProcessorA", "propX", "oldVal"); + targetProcessor.setPosition(new Position(10.0, 20.0)); + targetProcessor.setComments("old comments"); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(targetProcessor); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, Collections.emptySet(), targetSnapshot); + + assertTrue(analysis.isRebaseAllowed()); + assertEquals(3, analysis.getClassifiedLocalChanges().size()); + for (final RebaseAnalysis.ClassifiedDifference classified : analysis.getClassifiedLocalChanges()) { + assertEquals(RebaseClassification.COMPATIBLE, classified.getClassification()); + } + + final VersionedProcessGroup merged = analysis.getMergedSnapshot(); + assertNotNull(merged); + final VersionedProcessor mergedProc = findProcessorById(merged, "proc-a"); + assertNotNull(mergedProc); + assertEquals(50.0, mergedProc.getPosition().getX()); + assertEquals(60.0, mergedProc.getPosition().getY()); + assertEquals("newVal", mergedProc.getProperties().get("propX")); + assertEquals("new comments", mergedProc.getComments()); + } + + @Test + public void testEmptyLocalChanges() { + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessor("proc-a", "ProcessorA")); + + final RebaseAnalysis analysis = engine.analyze(Collections.emptySet(), Collections.emptySet(), targetSnapshot); + + assertTrue(analysis.isRebaseAllowed()); + assertTrue(analysis.getClassifiedLocalChanges().isEmpty()); + assertNotNull(analysis.getMergedSnapshot()); + assertEquals("root", analysis.getMergedSnapshot().getIdentifier()); + } + + @Test + public void testCanonicalConflictKeySameComponentDifferentProperties() { + final VersionedProcessor processor = createProcessor("proc-a", "ProcessorA"); + + final FlowDifference diffPropX = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, processor, "propX", + "old", "new", "Property propX changed"); + final FlowDifference diffPropY = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processor, processor, "propY", + "old", "new", "Property propY changed"); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(diffPropX); + localDifferences.add(diffPropY); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessorWithProperty("proc-a", "ProcessorA", "propX", "old")); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, Collections.emptySet(), targetSnapshot); + + assertTrue(analysis.isRebaseAllowed()); + assertEquals(2, analysis.getClassifiedLocalChanges().size()); + for (final RebaseAnalysis.ClassifiedDifference classified : analysis.getClassifiedLocalChanges()) { + assertEquals(RebaseClassification.COMPATIBLE, classified.getClassification()); + } + } + + @Test + public void testCanonicalConflictKeySamePropertyDifferentComponents() { + final VersionedProcessor processorA = createProcessor("proc-a", "ProcessorA"); + final VersionedProcessor processorB = createProcessor("proc-b", "ProcessorB"); + + final FlowDifference diffA = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processorA, processorA, "propX", + "old", "new", "Property propX changed on A"); + final FlowDifference diffB = new StandardFlowDifference(DifferenceType.PROPERTY_CHANGED, processorB, processorB, "propX", + "old", "new", "Property propX changed on B"); + + final String keyA = RebaseEngine.computeConflictKey(diffA); + final String keyB = RebaseEngine.computeConflictKey(diffB); + assertNotEquals(keyA, keyB); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(diffA); + localDifferences.add(diffB); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessorWithProperty("proc-a", "ProcessorA", "propX", "old")); + targetSnapshot.getProcessors().add(createProcessorWithProperty("proc-b", "ProcessorB", "propX", "old")); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, Collections.emptySet(), targetSnapshot); + + assertTrue(analysis.isRebaseAllowed()); + assertEquals(2, analysis.getClassifiedLocalChanges().size()); + for (final RebaseAnalysis.ClassifiedDifference classified : analysis.getClassifiedLocalChanges()) { + assertEquals(RebaseClassification.COMPATIBLE, classified.getClassification()); + } + } + + @Test + public void testAnalysisFingerprintDeterminism() { + final VersionedProcessor processor = createProcessor("proc-a", "ProcessorA"); + processor.setPosition(new Position(10.0, 20.0)); + final VersionedProcessor localProcessor = createProcessor("proc-a", "ProcessorA"); + localProcessor.setPosition(new Position(50.0, 60.0)); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.POSITION_CHANGED, processor, localProcessor, + new Position(10.0, 20.0), new Position(50.0, 60.0), "Position changed")); + + final Set upstreamDifferences = new HashSet<>(); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.COMPONENT_ADDED, null, createProcessor("proc-b", "ProcessorB"), + null, null, "Upstream added proc-b")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessor("proc-a", "ProcessorA")); + + final RebaseAnalysis analysis1 = engine.analyze(localDifferences, upstreamDifferences, targetSnapshot); + final RebaseAnalysis analysis2 = engine.analyze(localDifferences, upstreamDifferences, targetSnapshot); + + assertNotNull(analysis1.getAnalysisFingerprint()); + assertNotNull(analysis2.getAnalysisFingerprint()); + assertEquals(analysis1.getAnalysisFingerprint(), analysis2.getAnalysisFingerprint()); + } + + @Test + public void testAnalysisFingerprintChangesWithDifferentInputs() { + final VersionedProcessor processor = createProcessor("proc-a", "ProcessorA"); + processor.setPosition(new Position(10.0, 20.0)); + final VersionedProcessor localProcessor = createProcessor("proc-a", "ProcessorA"); + localProcessor.setPosition(new Position(50.0, 60.0)); + + final Set localDifferences1 = new HashSet<>(); + localDifferences1.add(new StandardFlowDifference(DifferenceType.POSITION_CHANGED, processor, localProcessor, + new Position(10.0, 20.0), new Position(50.0, 60.0), "Position changed")); + + final Set localDifferences2 = new HashSet<>(); + localDifferences2.add(new StandardFlowDifference(DifferenceType.COMMENTS_CHANGED, processor, localProcessor, + "old", "new", "Comments changed")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessors().add(createProcessor("proc-a", "ProcessorA")); + + final RebaseAnalysis analysis1 = engine.analyze(localDifferences1, Collections.emptySet(), targetSnapshot); + final RebaseAnalysis analysis2 = engine.analyze(localDifferences2, Collections.emptySet(), targetSnapshot); + + assertNotEquals(analysis1.getAnalysisFingerprint(), analysis2.getAnalysisFingerprint()); + } + + @Test + public void testDeepCloneIndependence() { + final VersionedProcessGroup original = new VersionedProcessGroup(); + original.setIdentifier("root"); + original.setComments("original comments"); + + final VersionedProcessor processor = createProcessor("proc-a", "ProcessorA"); + processor.setPosition(new Position(10.0, 20.0)); + original.getProcessors().add(processor); + + final VersionedProcessGroup cloned = engine.deepClone(original); + + cloned.setComments("modified comments"); + final VersionedProcessor clonedProcessor = cloned.getProcessors().iterator().next(); + clonedProcessor.setPosition(new Position(999.0, 999.0)); + + assertEquals("original comments", original.getComments()); + final VersionedProcessor originalProcessor = original.getProcessors().iterator().next(); + assertEquals(10.0, originalProcessor.getPosition().getX()); + assertEquals(20.0, originalProcessor.getPosition().getY()); + } + + @Test + public void testNestedComponentResolutionPositionChange() { + final VersionedProcessor processor = createProcessor("nested-proc", "NestedProcessor"); + processor.setPosition(new Position(10.0, 20.0)); + final VersionedProcessor localProcessor = createProcessor("nested-proc", "NestedProcessor"); + localProcessor.setPosition(new Position(100.0, 200.0)); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.POSITION_CHANGED, processor, localProcessor, + new Position(10.0, 20.0), new Position(100.0, 200.0), "Position changed on nested processor")); + + final VersionedProcessGroup childGroup = new VersionedProcessGroup(); + childGroup.setIdentifier("child-pg"); + final VersionedProcessor targetNestedProcessor = createProcessor("nested-proc", "NestedProcessor"); + targetNestedProcessor.setPosition(new Position(10.0, 20.0)); + childGroup.getProcessors().add(targetNestedProcessor); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getProcessGroups().add(childGroup); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, Collections.emptySet(), targetSnapshot); + + assertTrue(analysis.isRebaseAllowed()); + assertNotNull(analysis.getMergedSnapshot()); + + final VersionedProcessGroup mergedChild = analysis.getMergedSnapshot().getProcessGroups().iterator().next(); + final VersionedProcessor mergedNestedProc = mergedChild.getProcessors().iterator().next(); + assertEquals(100.0, mergedNestedProc.getPosition().getX()); + assertEquals(200.0, mergedNestedProc.getPosition().getY()); + } + + @Test + public void testSizeChangeCompatibility() { + final VersionedLabel originalLabel = createLabel("label-a", 100.0, 50.0); + final VersionedLabel localLabel = createLabel("label-a", 200.0, 100.0); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.SIZE_CHANGED, originalLabel, localLabel, + null, null, "Size changed on label")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getLabels().add(createLabel("label-a", 100.0, 50.0)); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, Collections.emptySet(), targetSnapshot); + + assertTrue(analysis.isRebaseAllowed()); + assertEquals(RebaseClassification.COMPATIBLE, analysis.getClassifiedLocalChanges().get(0).getClassification()); + assertNotNull(analysis.getMergedSnapshot()); + + final VersionedLabel mergedLabel = analysis.getMergedSnapshot().getLabels().iterator().next(); + assertEquals(200.0, mergedLabel.getWidth()); + assertEquals(100.0, mergedLabel.getHeight()); + } + + @Test + public void testBendpointsChangeCompatibility() { + final VersionedConnection originalConn = createConnection("conn-a"); + originalConn.setBends(List.of(new Position(10.0, 10.0))); + + final VersionedConnection localConn = createConnection("conn-a"); + final List localBends = List.of(new Position(50.0, 50.0), new Position(75.0, 75.0)); + localConn.setBends(localBends); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.BENDPOINTS_CHANGED, originalConn, localConn, + originalConn.getBends(), localConn.getBends(), "Bendpoints changed")); + + final VersionedConnection targetConn = createConnection("conn-a"); + targetConn.setBends(List.of(new Position(10.0, 10.0))); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + targetSnapshot.getConnections().add(targetConn); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, Collections.emptySet(), targetSnapshot); + + assertTrue(analysis.isRebaseAllowed()); + assertEquals(RebaseClassification.COMPATIBLE, analysis.getClassifiedLocalChanges().get(0).getClassification()); + assertNotNull(analysis.getMergedSnapshot()); + + final VersionedConnection mergedConn = analysis.getMergedSnapshot().getConnections().iterator().next(); + assertEquals(2, mergedConn.getBends().size()); + } + + @Test + public void testCommentsChangeCompatibility() { + final VersionedProcessor processor = createProcessor("proc-a", "ProcessorA"); + processor.setComments("old comments"); + final VersionedProcessor localProcessor = createProcessor("proc-a", "ProcessorA"); + localProcessor.setComments("new comments"); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.COMMENTS_CHANGED, processor, localProcessor, + "old comments", "new comments", "Comments changed")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + final VersionedProcessor targetProcessor = createProcessor("proc-a", "ProcessorA"); + targetProcessor.setComments("old comments"); + targetSnapshot.getProcessors().add(targetProcessor); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, Collections.emptySet(), targetSnapshot); + + assertTrue(analysis.isRebaseAllowed()); + assertEquals(RebaseClassification.COMPATIBLE, analysis.getClassifiedLocalChanges().get(0).getClassification()); + assertNotNull(analysis.getMergedSnapshot()); + + final VersionedProcessor mergedProc = findProcessorById(analysis.getMergedSnapshot(), "proc-a"); + assertNotNull(mergedProc); + assertEquals("new comments", mergedProc.getComments()); + } + + @Test + public void testCommentsChangeConflict() { + final VersionedProcessor processor = createProcessor("proc-a", "ProcessorA"); + processor.setComments("original"); + final VersionedProcessor localProcessor = createProcessor("proc-a", "ProcessorA"); + localProcessor.setComments("local comments"); + final VersionedProcessor upstreamProcessor = createProcessor("proc-a", "ProcessorA"); + upstreamProcessor.setComments("upstream comments"); + + final Set localDifferences = new HashSet<>(); + localDifferences.add(new StandardFlowDifference(DifferenceType.COMMENTS_CHANGED, processor, localProcessor, + "original", "local comments", "Comments changed locally")); + + final Set upstreamDifferences = new HashSet<>(); + upstreamDifferences.add(new StandardFlowDifference(DifferenceType.COMMENTS_CHANGED, processor, upstreamProcessor, + "original", "upstream comments", "Comments changed upstream")); + + final VersionedProcessGroup targetSnapshot = new VersionedProcessGroup(); + targetSnapshot.setIdentifier("root"); + + final RebaseAnalysis analysis = engine.analyze(localDifferences, upstreamDifferences, targetSnapshot); + + assertFalse(analysis.isRebaseAllowed()); + assertEquals(1, analysis.getClassifiedLocalChanges().size()); + assertEquals(RebaseClassification.CONFLICTING, analysis.getClassifiedLocalChanges().get(0).getClassification()); + assertNull(analysis.getMergedSnapshot()); + } + + private VersionedProcessor createProcessor(final String identifier, final String name) { + final VersionedProcessor processor = new VersionedProcessor(); + processor.setIdentifier(identifier); + processor.setName(name); + processor.setProperties(Collections.emptyMap()); + processor.setPropertyDescriptors(Collections.emptyMap()); + return processor; + } + + private VersionedProcessor createProcessorWithProperty(final String identifier, final String name, final String propertyName, final String propertyValue) { + final VersionedProcessor processor = new VersionedProcessor(); + processor.setIdentifier(identifier); + processor.setName(name); + + final Map properties = new HashMap<>(); + properties.put(propertyName, propertyValue); + processor.setProperties(properties); + + final VersionedPropertyDescriptor descriptor = new VersionedPropertyDescriptor(); + descriptor.setName(propertyName); + descriptor.setSensitive(false); + + final Map descriptors = new HashMap<>(); + descriptors.put(propertyName, descriptor); + processor.setPropertyDescriptors(descriptors); + + return processor; + } + + private VersionedLabel createLabel(final String identifier, final double width, final double height) { + final VersionedLabel label = new VersionedLabel(); + label.setIdentifier(identifier); + label.setWidth(width); + label.setHeight(height); + return label; + } + + private VersionedConnection createConnection(final String identifier) { + final VersionedConnection connection = new VersionedConnection(); + connection.setIdentifier(identifier); + return connection; + } + + private VersionedProcessor findProcessorById(final VersionedProcessGroup group, final String identifier) { + for (final VersionedProcessor processor : group.getProcessors()) { + if (identifier.equals(processor.getIdentifier())) { + return processor; + } + } + for (final VersionedProcessGroup childGroup : group.getProcessGroups()) { + final VersionedProcessor result = findProcessorById(childGroup, identifier); + if (result != null) { + return result; + } + } + return null; + } +} diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 2c3181a96db6..9a4aa93303fb 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -106,6 +106,8 @@ import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProvenanceEntity; +import org.apache.nifi.web.api.entity.RebaseAnalysisEntity; +import org.apache.nifi.web.api.entity.RebaseRequestEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.ReportingTaskEntity; import org.apache.nifi.web.api.entity.ReportingTaskRunStatusEntity; @@ -2038,6 +2040,50 @@ private VersionedFlowUpdateRequestEntity waitForFlowRevertCompleted(final String } } + public RebaseAnalysisEntity getRebaseAnalysis(final String processGroupId, final String targetVersion) throws NiFiClientException, IOException { + return nifiClient.getVersionsClient().getRebaseAnalysis(processGroupId, targetVersion); + } + + public VersionedFlowUpdateRequestEntity rebaseFlowVersion(final String processGroupId, final String targetVersion) throws NiFiClientException, IOException, InterruptedException { + logger.info("Submitting Rebase Flow Version request to rebase Group with ID {} to Version {}", processGroupId, targetVersion); + + final ProcessGroupEntity groupEntity = nifiClient.getProcessGroupClient().getProcessGroup(processGroupId); + final ProcessGroupDTO groupDto = groupEntity.getComponent(); + final VersionControlInformationDTO vciDto = groupDto.getVersionControlInformation(); + if (vciDto == null) { + throw new IllegalArgumentException("Process Group with ID " + processGroupId + " is not under Version Control"); + } + + vciDto.setVersion(targetVersion); + + final VersionControlInformationEntity vciEntity = new VersionControlInformationEntity(); + vciEntity.setProcessGroupRevision(groupEntity.getRevision()); + vciEntity.setVersionControlInformation(vciDto); + + final RebaseAnalysisEntity analysis = nifiClient.getVersionsClient().getRebaseAnalysis(processGroupId, targetVersion); + + final RebaseRequestEntity rebaseRequest = new RebaseRequestEntity(); + rebaseRequest.setVersionControlInformationEntity(vciEntity); + rebaseRequest.setAnalysisFingerprint(analysis.getAnalysisFingerprint()); + + final VersionedFlowUpdateRequestEntity result = nifiClient.getVersionsClient().initiateRebase(processGroupId, rebaseRequest); + return waitForRebaseCompleted(result.getRequest().getRequestId()); + } + + private VersionedFlowUpdateRequestEntity waitForRebaseCompleted(final String requestId) throws NiFiClientException, IOException, InterruptedException { + final VersionsClient versionsClient = nifiClient.getVersionsClient(); + + while (true) { + final VersionedFlowUpdateRequestEntity entity = versionsClient.getRebaseRequest(requestId); + + if (entity.getRequest().isComplete()) { + return entity; + } + + Thread.sleep(100L); + } + } + public ProcessGroupEntity importFlowFromRegistry(final String parentGroupId, final VersionControlInformationDTO vciDto) throws NiFiClientException, IOException { return importFlowFromRegistry(parentGroupId, vciDto.getRegistryId(), vciDto.getBucketId(), vciDto.getFlowId(), vciDto.getVersion()); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RebaseVersionIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RebaseVersionIT.java new file mode 100644 index 000000000000..d4a4e10d44dc --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RebaseVersionIT.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.registry; + +import org.apache.nifi.tests.system.NiFiClientUtil; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.dto.RebaseChangeDTO; +import org.apache.nifi.web.api.dto.VersionControlInformationDTO; +import org.apache.nifi.web.api.dto.flow.FlowDTO; +import org.apache.nifi.web.api.entity.FlowRegistryClientEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.RebaseAnalysisEntity; +import org.apache.nifi.web.api.entity.RebaseRequestEntity; +import org.apache.nifi.web.api.entity.VersionControlInformationEntity; +import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RebaseVersionIT extends NiFiSystemIT { + private static final String TEST_FLOWS_BUCKET = "test-flows"; + + @Test + public void testCleanRebaseWithPositionAndPropertyChanges() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity clientEntity = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ProcessGroupEntity originalGroup = util.createProcessGroup("Original", "root"); + final ProcessorEntity generate = util.createProcessor("GenerateFlowFile", originalGroup.getId()); + + final VersionControlInformationEntity vci = util.startVersionControl(originalGroup, clientEntity, TEST_FLOWS_BUCKET, "RebaseFlow"); + final String flowId = vci.getVersionControlInformation().getFlowId(); + + final ProcessGroupEntity secondGroup = util.importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, flowId, "1"); + final ProcessorEntity secondGroupProcessor = findSingleProcessor(secondGroup.getId()); + util.updateProcessorProperties(secondGroupProcessor, Map.of("Batch Size", "5")); + util.saveFlowVersion(secondGroup, clientEntity, getVersionControlInformation(secondGroup.getId())); + + util.updateProcessorProperties(generate, Map.of("File Size", "99 B")); + + final RebaseAnalysisEntity analysis = util.getRebaseAnalysis(originalGroup.getId(), "2"); + assertTrue(analysis.getRebaseAllowed(), "Expected rebase to be allowed but it was not. Failure: " + analysis.getFailureReason() + + ". Local changes: " + describeLocalChanges(analysis)); + + util.rebaseFlowVersion(originalGroup.getId(), "2"); + + final VersionControlInformationDTO updatedVci = getVersionControlInfo(originalGroup.getId()); + assertEquals("2", updatedVci.getVersion()); + + final ProcessorEntity rebasedProcessor = findSingleProcessor(originalGroup.getId()); + final Map properties = rebasedProcessor.getComponent().getConfig().getProperties(); + assertEquals("99 B", properties.get("File Size")); + assertEquals("5", properties.get("Batch Size")); + } + + @Test + public void testRejectedRebaseDueToConflictingProperty() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity clientEntity = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ProcessGroupEntity originalGroup = util.createProcessGroup("Original", "root"); + final ProcessorEntity generate = util.createProcessor("GenerateFlowFile", originalGroup.getId()); + + final VersionControlInformationEntity vci = util.startVersionControl(originalGroup, clientEntity, TEST_FLOWS_BUCKET, "ConflictFlow"); + final String flowId = vci.getVersionControlInformation().getFlowId(); + + final ProcessGroupEntity secondGroup = util.importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, flowId, "1"); + final ProcessorEntity secondGroupProcessor = findSingleProcessor(secondGroup.getId()); + util.updateProcessorProperties(secondGroupProcessor, Map.of("File Size", "20 B")); + util.saveFlowVersion(secondGroup, clientEntity, getVersionControlInformation(secondGroup.getId())); + + util.updateProcessorProperties(generate, Map.of("File Size", "50 B")); + + final RebaseAnalysisEntity analysis = util.getRebaseAnalysis(originalGroup.getId(), "2"); + assertFalse(analysis.getRebaseAllowed()); + + final boolean hasConflicting = analysis.getLocalChanges().stream() + .map(RebaseChangeDTO::getClassification) + .anyMatch("CONFLICTING"::equals); + assertTrue(hasConflicting); + } + + @Test + public void testRejectedRebaseDueToUnsupportedChangeType() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity clientEntity = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ProcessGroupEntity originalGroup = util.createProcessGroup("Original", "root"); + util.createProcessor("GenerateFlowFile", originalGroup.getId()); + + final VersionControlInformationEntity vci = util.startVersionControl(originalGroup, clientEntity, TEST_FLOWS_BUCKET, "UnsupportedFlow"); + final String flowId = vci.getVersionControlInformation().getFlowId(); + + final ProcessGroupEntity secondGroup = util.importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, flowId, "1"); + final ProcessorEntity secondGroupProcessor = findSingleProcessor(secondGroup.getId()); + util.updateProcessorProperties(secondGroupProcessor, Map.of("File Size", "20 B")); + util.saveFlowVersion(secondGroup, clientEntity, getVersionControlInformation(secondGroup.getId())); + + util.createProcessor("TerminateFlowFile", originalGroup.getId()); + + final RebaseAnalysisEntity analysis = util.getRebaseAnalysis(originalGroup.getId(), "2"); + assertFalse(analysis.getRebaseAllowed()); + + final boolean hasUnsupported = analysis.getLocalChanges().stream() + .map(RebaseChangeDTO::getClassification) + .anyMatch("UNSUPPORTED"::equals); + assertTrue(hasUnsupported); + } + + @Test + public void testRebaseFollowedByCommit() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity clientEntity = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ProcessGroupEntity originalGroup = util.createProcessGroup("Original", "root"); + final ProcessorEntity generate = util.createProcessor("GenerateFlowFile", originalGroup.getId()); + + final VersionControlInformationEntity vci = util.startVersionControl(originalGroup, clientEntity, TEST_FLOWS_BUCKET, "CommitAfterRebase"); + final String flowId = vci.getVersionControlInformation().getFlowId(); + + final ProcessGroupEntity secondGroup = util.importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, flowId, "1"); + final ProcessorEntity secondGroupProcessor = findSingleProcessor(secondGroup.getId()); + util.updateProcessorProperties(secondGroupProcessor, Map.of("Batch Size", "5")); + util.saveFlowVersion(secondGroup, clientEntity, getVersionControlInformation(secondGroup.getId())); + + util.updateProcessorProperties(generate, Map.of("File Size", "99 B")); + + util.rebaseFlowVersion(originalGroup.getId(), "2"); + + final VersionControlInformationDTO afterRebase = getVersionControlInfo(originalGroup.getId()); + assertEquals("2", afterRebase.getVersion()); + + util.saveFlowVersion(originalGroup, clientEntity, getVersionControlInformation(originalGroup.getId())); + + final VersionControlInformationDTO committedVci = getVersionControlInfo(originalGroup.getId()); + assertEquals("3", committedVci.getVersion()); + } + + @Test + public void testRebaseFollowedByRevert() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity clientEntity = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ProcessGroupEntity originalGroup = util.createProcessGroup("Original", "root"); + final ProcessorEntity generate = util.createProcessor("GenerateFlowFile", originalGroup.getId()); + + final VersionControlInformationEntity vci = util.startVersionControl(originalGroup, clientEntity, TEST_FLOWS_BUCKET, "RevertAfterRebase"); + final String flowId = vci.getVersionControlInformation().getFlowId(); + + final ProcessGroupEntity secondGroup = util.importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, flowId, "1"); + final ProcessorEntity secondGroupProcessor = findSingleProcessor(secondGroup.getId()); + util.updateProcessorProperties(secondGroupProcessor, Map.of("Batch Size", "5")); + util.saveFlowVersion(secondGroup, clientEntity, getVersionControlInformation(secondGroup.getId())); + + util.updateProcessorProperties(generate, Map.of("File Size", "99 B")); + + util.rebaseFlowVersion(originalGroup.getId(), "2"); + + final VersionControlInformationDTO afterRebase = getVersionControlInfo(originalGroup.getId()); + assertEquals("2", afterRebase.getVersion()); + + util.revertChanges(originalGroup); + + final VersionControlInformationDTO revertedVci = getVersionControlInfo(originalGroup.getId()); + assertEquals("2", revertedVci.getVersion()); + } + + @Test + public void testMultiVersionJump() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity clientEntity = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ProcessGroupEntity originalGroup = util.createProcessGroup("Original", "root"); + final ProcessorEntity generate = util.createProcessor("GenerateFlowFile", originalGroup.getId()); + + final VersionControlInformationEntity vci = util.startVersionControl(originalGroup, clientEntity, TEST_FLOWS_BUCKET, "MultiVersionFlow"); + final String flowId = vci.getVersionControlInformation().getFlowId(); + + final ProcessGroupEntity secondGroup = util.importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, flowId, "1"); + + final ProcessorEntity v2Processor = findSingleProcessor(secondGroup.getId()); + util.updateProcessorProperties(v2Processor, Map.of("File Size", "20 B")); + util.saveFlowVersion(secondGroup, clientEntity, getVersionControlInformation(secondGroup.getId())); + + final ProcessorEntity v3Processor = findSingleProcessor(secondGroup.getId()); + util.updateProcessorProperties(v3Processor, Map.of("Batch Size", "3")); + util.saveFlowVersion(secondGroup, clientEntity, getVersionControlInformation(secondGroup.getId())); + + final ProcessorEntity v4Processor = findSingleProcessor(secondGroup.getId()); + util.updateProcessorProperties(v4Processor, Map.of("Max FlowFiles", "10")); + util.saveFlowVersion(secondGroup, clientEntity, getVersionControlInformation(secondGroup.getId())); + + util.updateProcessorProperties(generate, Map.of("Text", "hello")); + + final RebaseAnalysisEntity analysis = util.getRebaseAnalysis(originalGroup.getId(), "4"); + assertTrue(analysis.getRebaseAllowed(), "Expected rebase to be allowed but it was not. Failure: " + analysis.getFailureReason() + + ". Local changes: " + describeLocalChanges(analysis)); + + util.rebaseFlowVersion(originalGroup.getId(), "4"); + + final VersionControlInformationDTO updatedVci = getVersionControlInfo(originalGroup.getId()); + assertEquals("4", updatedVci.getVersion()); + + final ProcessorEntity rebasedProcessor = findSingleProcessor(originalGroup.getId()); + final Map properties = rebasedProcessor.getComponent().getConfig().getProperties(); + assertEquals("hello", properties.get("Text")); + assertEquals("20 B", properties.get("File Size")); + assertEquals("3", properties.get("Batch Size")); + assertEquals("10", properties.get("Max FlowFiles")); + } + + @Test + public void testStaleAnalysisFingerprintRejection() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity clientEntity = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ProcessGroupEntity originalGroup = util.createProcessGroup("Original", "root"); + final ProcessorEntity generate = util.createProcessor("GenerateFlowFile", originalGroup.getId()); + + final VersionControlInformationEntity vci = util.startVersionControl(originalGroup, clientEntity, TEST_FLOWS_BUCKET, "FingerprintFlow"); + final String flowId = vci.getVersionControlInformation().getFlowId(); + + final ProcessGroupEntity secondGroup = util.importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, flowId, "1"); + final ProcessorEntity secondGroupProcessor = findSingleProcessor(secondGroup.getId()); + util.updateProcessorProperties(secondGroupProcessor, Map.of("Batch Size", "5")); + util.saveFlowVersion(secondGroup, clientEntity, getVersionControlInformation(secondGroup.getId())); + + util.updateProcessorProperties(generate, Map.of("File Size", "99 B")); + + final RebaseAnalysisEntity initialAnalysis = util.getRebaseAnalysis(originalGroup.getId(), "2"); + final String staleFingerprint = initialAnalysis.getAnalysisFingerprint(); + + util.updateProcessorProperties(generate, Map.of("Max FlowFiles", "50")); + + boolean rebaseWithStaleFailed = false; + try { + executeRebaseWithFingerprint(originalGroup, "2", staleFingerprint); + } catch (final Exception e) { + rebaseWithStaleFailed = true; + } + assertTrue(rebaseWithStaleFailed); + + util.rebaseFlowVersion(originalGroup.getId(), "2"); + + final VersionControlInformationDTO updatedVci = getVersionControlInfo(originalGroup.getId()); + assertEquals("2", updatedVci.getVersion()); + } + + @Test + public void testNestedVersionedPGWithLocalModificationsBlocksRebase() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity clientEntity = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ProcessGroupEntity parentGroup = util.createProcessGroup("Parent", "root"); + util.createProcessor("GenerateFlowFile", parentGroup.getId()); + + final ProcessGroupEntity childGroup = util.createProcessGroup("Child", parentGroup.getId()); + final ProcessorEntity childProcessor = util.createProcessor("TerminateFlowFile", childGroup.getId()); + util.startVersionControl(childGroup, clientEntity, TEST_FLOWS_BUCKET, "ChildFlow"); + + final VersionControlInformationEntity parentVci = util.startVersionControl(parentGroup, clientEntity, TEST_FLOWS_BUCKET, "ParentFlow"); + final String parentFlowId = parentVci.getVersionControlInformation().getFlowId(); + + util.updateProcessorProperties(childProcessor, Map.of("Response Status", "200")); + + final ProcessGroupEntity secondParent = util.importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, parentFlowId, "1"); + final ProcessorEntity secondParentProcessor = findProcessorByType(secondParent.getId(), "GenerateFlowFile"); + util.updateProcessorProperties(secondParentProcessor, Map.of("File Size", "20 B")); + util.saveFlowVersion(secondParent, clientEntity, getVersionControlInformation(secondParent.getId())); + + final RebaseAnalysisEntity analysis = util.getRebaseAnalysis(parentGroup.getId(), "2"); + assertFalse(analysis.getRebaseAllowed(), "Rebase should be blocked due to descendant modifications"); + assertNotNull(analysis.getFailureReason()); + } + + @Test + public void testRebaseThenRebaseAgain() throws NiFiClientException, IOException, InterruptedException { + final FlowRegistryClientEntity clientEntity = registerClient(); + final NiFiClientUtil util = getClientUtil(); + + final ProcessGroupEntity originalGroup = util.createProcessGroup("Original", "root"); + final ProcessorEntity generate = util.createProcessor("GenerateFlowFile", originalGroup.getId()); + + final VersionControlInformationEntity vci = util.startVersionControl(originalGroup, clientEntity, TEST_FLOWS_BUCKET, "DoubleRebaseFlow"); + final String flowId = vci.getVersionControlInformation().getFlowId(); + + final ProcessGroupEntity secondGroup = util.importFlowFromRegistry("root", clientEntity.getId(), TEST_FLOWS_BUCKET, flowId, "1"); + final ProcessorEntity secondGroupProcessor = findSingleProcessor(secondGroup.getId()); + util.updateProcessorProperties(secondGroupProcessor, Map.of("Batch Size", "5")); + util.saveFlowVersion(secondGroup, clientEntity, getVersionControlInformation(secondGroup.getId())); + + util.updateProcessorProperties(generate, Map.of("File Size", "99 B")); + util.rebaseFlowVersion(originalGroup.getId(), "2"); + + final VersionControlInformationDTO afterFirstRebase = getVersionControlInfo(originalGroup.getId()); + assertEquals("2", afterFirstRebase.getVersion()); + + final ProcessorEntity v3Processor = findSingleProcessor(secondGroup.getId()); + util.updateProcessorProperties(v3Processor, Map.of("Max FlowFiles", "30")); + util.saveFlowVersion(secondGroup, clientEntity, getVersionControlInformation(secondGroup.getId())); + + util.rebaseFlowVersion(originalGroup.getId(), "3"); + + final VersionControlInformationDTO afterSecondRebase = getVersionControlInfo(originalGroup.getId()); + assertEquals("3", afterSecondRebase.getVersion()); + + final ProcessorEntity rebasedProcessor = findSingleProcessor(originalGroup.getId()); + final Map properties = rebasedProcessor.getComponent().getConfig().getProperties(); + assertEquals("99 B", properties.get("File Size")); + assertEquals("5", properties.get("Batch Size")); + assertEquals("30", properties.get("Max FlowFiles")); + } + + private String describeLocalChanges(final RebaseAnalysisEntity analysis) { + if (analysis.getLocalChanges() == null || analysis.getLocalChanges().isEmpty()) { + return "none"; + } + final StringBuilder sb = new StringBuilder(); + for (final RebaseChangeDTO change : analysis.getLocalChanges()) { + if (!sb.isEmpty()) { + sb.append(", "); + } + sb.append("[%s %s on %s = %s]".formatted(change.getClassification(), change.getDifferenceType(), change.getComponentName(), change.getConflictDetail())); + } + return sb.toString(); + } + + private VersionControlInformationDTO getVersionControlInfo(final String processGroupId) throws NiFiClientException, IOException { + return getNifiClient().getProcessGroupClient().getProcessGroup(processGroupId) + .getComponent().getVersionControlInformation(); + } + + private VersionControlInformationEntity getVersionControlInformation(final String processGroupId) throws NiFiClientException, IOException { + return getNifiClient().getVersionsClient().getVersionControlInfo(processGroupId); + } + + private ProcessorEntity findSingleProcessor(final String processGroupId) throws NiFiClientException, IOException { + final FlowDTO flow = getNifiClient().getFlowClient().getProcessGroup(processGroupId).getProcessGroupFlow().getFlow(); + final Set processors = flow.getProcessors(); + assertEquals(1, processors.size()); + return processors.iterator().next(); + } + + private ProcessorEntity findProcessorByType(final String processGroupId, final String simpleTypeName) throws NiFiClientException, IOException { + final FlowDTO flow = getNifiClient().getFlowClient().getProcessGroup(processGroupId).getProcessGroupFlow().getFlow(); + return flow.getProcessors().stream() + .filter(proc -> proc.getComponent().getType().endsWith("." + simpleTypeName)) + .findFirst() + .orElseThrow(() -> new AssertionError("No processor of type " + simpleTypeName + " found in group " + processGroupId)); + } + + private void executeRebaseWithFingerprint(final ProcessGroupEntity group, final String targetVersion, final String fingerprint) + throws NiFiClientException, IOException, InterruptedException { + + final ProcessGroupEntity groupEntity = getNifiClient().getProcessGroupClient().getProcessGroup(group.getId()); + final VersionControlInformationDTO vciDto = groupEntity.getComponent().getVersionControlInformation(); + vciDto.setVersion(targetVersion); + + final VersionControlInformationEntity vciEntity = new VersionControlInformationEntity(); + vciEntity.setProcessGroupRevision(groupEntity.getRevision()); + vciEntity.setVersionControlInformation(vciDto); + + final RebaseRequestEntity rebaseRequest = new RebaseRequestEntity(); + rebaseRequest.setVersionControlInformationEntity(vciEntity); + rebaseRequest.setAnalysisFingerprint(fingerprint); + + final VersionedFlowUpdateRequestEntity result = getNifiClient().getVersionsClient().initiateRebase(group.getId(), rebaseRequest); + final String requestId = result.getRequest().getRequestId(); + + while (true) { + final VersionedFlowUpdateRequestEntity entity = getNifiClient().getVersionsClient().getRebaseRequest(requestId); + if (entity.getRequest().isComplete()) { + if (entity.getRequest().getFailureReason() != null) { + throw new RuntimeException("Rebase failed: " + entity.getRequest().getFailureReason()); + } + return; + } + Thread.sleep(100L); + } + } +} diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/VersionsClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/VersionsClient.java index 1dd5c707c680..7b21632aa61f 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/VersionsClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/VersionsClient.java @@ -17,6 +17,8 @@ package org.apache.nifi.toolkit.client; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.RebaseAnalysisEntity; +import org.apache.nifi.web.api.entity.RebaseRequestEntity; import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; import org.apache.nifi.web.api.entity.VersionControlInformationEntity; import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity; @@ -44,4 +46,12 @@ VersionedFlowUpdateRequestEntity updateVersionControlInfo(String processGroupId, VersionedFlowUpdateRequestEntity deleteRevertFlowVersionRequest(String requestId) throws IOException, NiFiClientException; + RebaseAnalysisEntity getRebaseAnalysis(String processGroupId, String targetVersion) throws IOException, NiFiClientException; + + VersionedFlowUpdateRequestEntity initiateRebase(String processGroupId, RebaseRequestEntity entity) throws IOException, NiFiClientException; + + VersionedFlowUpdateRequestEntity getRebaseRequest(String requestId) throws IOException, NiFiClientException; + + VersionedFlowUpdateRequestEntity deleteRebaseRequest(String requestId) throws IOException, NiFiClientException; + } diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyVersionsClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyVersionsClient.java index 7329da624f05..6b8fc2119f4f 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyVersionsClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyVersionsClient.java @@ -24,6 +24,8 @@ import org.apache.nifi.toolkit.client.RequestConfig; import org.apache.nifi.toolkit.client.VersionsClient; import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.RebaseAnalysisEntity; +import org.apache.nifi.web.api.entity.RebaseRequestEntity; import org.apache.nifi.web.api.entity.StartVersionControlRequestEntity; import org.apache.nifi.web.api.entity.VersionControlInformationEntity; import org.apache.nifi.web.api.entity.VersionedFlowUpdateRequestEntity; @@ -217,4 +219,83 @@ public VersionedFlowUpdateRequestEntity deleteRevertFlowVersionRequest(final Str return getRequestBuilder(target).delete(VersionedFlowUpdateRequestEntity.class); }); } + + // GET /versions/rebase-analysis/process-groups/id + + @Override + public RebaseAnalysisEntity getRebaseAnalysis(final String processGroupId, final String targetVersion) throws IOException, NiFiClientException { + if (StringUtils.isBlank(processGroupId)) { + throw new IllegalArgumentException("Process group id cannot be null or blank"); + } + + if (StringUtils.isBlank(targetVersion)) { + throw new IllegalArgumentException("Target version cannot be null or blank"); + } + + return executeAction("Error getting rebase analysis", () -> { + final WebTarget target = versionsTarget + .path("rebase-analysis/process-groups/{id}") + .resolveTemplate("id", processGroupId) + .queryParam("targetVersion", targetVersion); + + return getRequestBuilder(target).get(RebaseAnalysisEntity.class); + }); + } + + // POST /versions/rebase-requests/process-groups/id + + @Override + public VersionedFlowUpdateRequestEntity initiateRebase(final String processGroupId, final RebaseRequestEntity entity) throws IOException, NiFiClientException { + if (StringUtils.isBlank(processGroupId)) { + throw new IllegalArgumentException("Process group id cannot be null or blank"); + } + + if (entity == null) { + throw new IllegalArgumentException("Rebase request entity cannot be null"); + } + + return executeAction("Error initiating rebase", () -> { + final WebTarget target = versionsTarget + .path("rebase-requests/process-groups/{id}") + .resolveTemplate("id", processGroupId); + + return getRequestBuilder(target).post( + Entity.entity(entity, MediaType.APPLICATION_JSON_TYPE), + VersionedFlowUpdateRequestEntity.class); + }); + } + + // GET /versions/rebase-requests/id + + @Override + public VersionedFlowUpdateRequestEntity getRebaseRequest(final String requestId) throws IOException, NiFiClientException { + if (StringUtils.isBlank(requestId)) { + throw new IllegalArgumentException("Rebase request id cannot be null or blank"); + } + + return executeAction("Error getting rebase request", () -> { + final WebTarget target = versionsTarget + .path("rebase-requests/{id}") + .resolveTemplate("id", requestId); + + return getRequestBuilder(target).get(VersionedFlowUpdateRequestEntity.class); + }); + } + + // DELETE /versions/rebase-requests/id + + @Override + public VersionedFlowUpdateRequestEntity deleteRebaseRequest(final String requestId) throws IOException, NiFiClientException { + if (StringUtils.isBlank(requestId)) { + throw new IllegalArgumentException("Rebase request id cannot be null or blank"); + } + + return executeAction("Error deleting rebase request", () -> { + final WebTarget target = versionsTarget + .path("rebase-requests/{id}") + .resolveTemplate("id", requestId); + + return getRequestBuilder(target).delete(VersionedFlowUpdateRequestEntity.class); + }); + } }