Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@
import org.apache.nifi.components.ClassLoaderAwarePythonBridge;
import org.apache.nifi.components.connector.ConnectorConfigurationProvider;
import org.apache.nifi.components.connector.ConnectorConfigurationProviderInitializationContext;
import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
import org.apache.nifi.components.connector.ConnectorRepositoryInitializationContext;
import org.apache.nifi.components.connector.ConnectorRequestReplicator;
import org.apache.nifi.components.connector.ConnectorValidationTrigger;
import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.components.connector.StandardConnectorConfigurationProviderInitializationContext;
import org.apache.nifi.components.connector.StandardConnectorRepoInitializationContext;
import org.apache.nifi.components.connector.StandardConnectorRepository;
Expand Down Expand Up @@ -1023,6 +1025,60 @@ public ConnectorRepository getConnectorRepository() {
return connectorRepository;
}

/**
* Finds a Connection by ID, searching both the root process group hierarchy
* and all connector-managed process groups.
*
* @param connectionId the connection identifier
* @return the Connection, or null if not found
*/
public Connection findConnectionIncludingConnectorManaged(final String connectionId) {
final Connection connection = flowManager.getRootGroup().findConnection(connectionId);
if (connection != null) {
return connection;
}

for (final ConnectorNode connector : connectorRepository.getConnectors()) {
final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
if (flowContext != null) {
final ProcessGroup managedGroup = flowContext.getManagedProcessGroup();
final Connection managedConnection = managedGroup.findConnection(connectionId);
if (managedConnection != null) {
return managedConnection;
}
}
}

return null;
}

/**
* Finds a RemoteGroupPort by ID, searching both the root process group hierarchy
* and all connector-managed process groups.
*
* @param remoteGroupPortId the remote group port identifier
* @return the RemoteGroupPort, or null if not found
*/
public RemoteGroupPort findRemoteGroupPortIncludingConnectorManaged(final String remoteGroupPortId) {
final RemoteGroupPort remoteGroupPort = flowManager.getRootGroup().findRemoteGroupPort(remoteGroupPortId);
if (remoteGroupPort != null) {
return remoteGroupPort;
}

for (final ConnectorNode connector : connectorRepository.getConnectors()) {
final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
if (flowContext != null) {
final ProcessGroup managedGroup = flowContext.getManagedProcessGroup();
final RemoteGroupPort managedPort = managedGroup.findRemoteGroupPort(remoteGroupPortId);
if (managedPort != null) {
return managedPort;
}
}
}

return null;
}

private PythonBridge createPythonBridge(final NiFiProperties nifiProperties, final ControllerServiceProvider serviceProvider) {
final String pythonCommand = nifiProperties.getProperty(NiFiProperties.PYTHON_COMMAND);
if (pythonCommand == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Authorizable createLocalDataAuthorizable(final String componentId) {
final Connectable connectable = flowManager.findConnectable(componentId);
if (connectable == null) {
// if the component id is not a connectable then consider a connection
final Connection connection = flowManager.getRootGroup().findConnection(componentId);
final Connection connection = flowController.findConnectionIncludingConnectorManaged(componentId);

if (connection == null) {
throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");
Expand All @@ -70,7 +70,7 @@ public Authorizable createLocalDataAuthorizable(final String componentId) {
public Authorizable createRemoteDataAuthorizable(String remoteGroupPortId) {
final DataAuthorizable authorizable;

final RemoteGroupPort remoteGroupPort = flowController.getFlowManager().getRootGroup().findRemoteGroupPort(remoteGroupPortId);
final RemoteGroupPort remoteGroupPort = flowController.findRemoteGroupPortIncludingConnectorManaged(remoteGroupPortId);
if (remoteGroupPort == null) {
throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");
} else {
Expand Down Expand Up @@ -98,7 +98,7 @@ public Authorizable createProvenanceDataAuthorizable(String componentId) {
final Connectable connectable = flowManager.findConnectable(componentId);
if (connectable == null) {
// if the component id is not a connectable then consider a connection
final Connection connection = flowManager.getRootGroup().findConnection(componentId);
final Connection connection = flowController.findConnectionIncludingConnectorManaged(componentId);

if (connection == null) {
throw new ResourceNotFoundException("The component that generated this event is no longer part of the data flow.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;

import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.DataAuthorizable;
import org.apache.nifi.authorization.resource.EnforcePolicyPermissionsThroughBaseResource;
import org.apache.nifi.authorization.resource.ProvenanceDataAuthorizable;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.web.ResourceNotFoundException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class StandardProvenanceAuthorizableFactoryTest {

private StandardProvenanceAuthorizableFactory factory;

@Mock
private FlowController flowController;

@Mock
private FlowManager flowManager;

@Mock
private ProcessGroup rootGroup;

@Mock
private Connection rootConnection;

@Mock
private Connection connectorConnection;

@Mock
private Connectable connectable;

@Mock
private Connectable connectionSource;

@Mock
private Connectable connectorConnectionSource;

@Mock
private RemoteProcessGroup remoteProcessGroup;

private static final String ROOT_GROUP_ID = "root-group-id";
private static final String CONNECTABLE_ID = "connectable-id";
private static final String ROOT_CONNECTION_ID = "root-connection-id";
private static final String CONNECTOR_CONNECTION_ID = "connector-connection-id";
private static final String ROOT_REMOTE_PORT_ID = "root-remote-port-id";
private static final String CONNECTOR_REMOTE_PORT_ID = "connector-remote-port-id";
private static final String NON_EXISTENT_ID = "non-existent-id";

@BeforeEach
void setUp() {
factory = new StandardProvenanceAuthorizableFactory(flowController);

when(flowController.getFlowManager()).thenReturn(flowManager);
when(flowManager.getRootGroup()).thenReturn(rootGroup);
when(flowManager.getRootGroupId()).thenReturn(ROOT_GROUP_ID);

// Connectable lookups (flat map, works for all components)
when(flowManager.findConnectable(CONNECTABLE_ID)).thenReturn(connectable);
when(flowManager.findConnectable(ROOT_CONNECTION_ID)).thenReturn(null);
when(flowManager.findConnectable(CONNECTOR_CONNECTION_ID)).thenReturn(null);
when(flowManager.findConnectable(NON_EXISTENT_ID)).thenReturn(null);

// Connection lookups via FlowController (includes connector-managed PGs)
when(rootConnection.getSource()).thenReturn(connectionSource);
when(connectorConnection.getSource()).thenReturn(connectorConnectionSource);
when(flowController.findConnectionIncludingConnectorManaged(ROOT_CONNECTION_ID)).thenReturn(rootConnection);
when(flowController.findConnectionIncludingConnectorManaged(CONNECTOR_CONNECTION_ID)).thenReturn(connectorConnection);
when(flowController.findConnectionIncludingConnectorManaged(NON_EXISTENT_ID)).thenReturn(null);

// Remote group port lookups via FlowController
when(flowController.findRemoteGroupPortIncludingConnectorManaged(NON_EXISTENT_ID)).thenReturn(null);
}

@Nested
class CreateLocalDataAuthorizable {

@Test
void testRootGroupIdReturnsDataAuthorizableForRootGroup() {
final Authorizable result = factory.createLocalDataAuthorizable(ROOT_GROUP_ID);

assertInstanceOf(DataAuthorizable.class, result);
assertEquals(rootGroup, ((EnforcePolicyPermissionsThroughBaseResource) result).getBaseAuthorizable());
}

@Test
void testConnectableReturnsDataAuthorizableForConnectable() {
final Authorizable result = factory.createLocalDataAuthorizable(CONNECTABLE_ID);

assertInstanceOf(DataAuthorizable.class, result);
assertEquals(connectable, ((EnforcePolicyPermissionsThroughBaseResource) result).getBaseAuthorizable());
}

@Test
void testConnectionInRootGroupReturnsDataAuthorizableForSource() {
final Authorizable result = factory.createLocalDataAuthorizable(ROOT_CONNECTION_ID);

assertInstanceOf(DataAuthorizable.class, result);
assertEquals(connectionSource, ((EnforcePolicyPermissionsThroughBaseResource) result).getBaseAuthorizable());
}

@Test
void testConnectionInConnectorManagedGroupReturnsDataAuthorizableForSource() {
final Authorizable result = factory.createLocalDataAuthorizable(CONNECTOR_CONNECTION_ID);

assertInstanceOf(DataAuthorizable.class, result);
assertEquals(connectorConnectionSource, ((EnforcePolicyPermissionsThroughBaseResource) result).getBaseAuthorizable());
}

@Test
void testNonExistentComponentThrowsResourceNotFoundException() {
assertThrows(ResourceNotFoundException.class, () ->
factory.createLocalDataAuthorizable(NON_EXISTENT_ID)
);
}
}

@Nested
class CreateProvenanceDataAuthorizable {

@Test
void testRootGroupIdReturnsProvenanceDataAuthorizableForRootGroup() {
final Authorizable result = factory.createProvenanceDataAuthorizable(ROOT_GROUP_ID);

assertInstanceOf(ProvenanceDataAuthorizable.class, result);
assertEquals(rootGroup, ((EnforcePolicyPermissionsThroughBaseResource) result).getBaseAuthorizable());
}

@Test
void testConnectableReturnsProvenanceDataAuthorizableForConnectable() {
final Authorizable result = factory.createProvenanceDataAuthorizable(CONNECTABLE_ID);

assertInstanceOf(ProvenanceDataAuthorizable.class, result);
assertEquals(connectable, ((EnforcePolicyPermissionsThroughBaseResource) result).getBaseAuthorizable());
}

@Test
void testConnectionInRootGroupReturnsProvenanceDataAuthorizableForSource() {
final Authorizable result = factory.createProvenanceDataAuthorizable(ROOT_CONNECTION_ID);

assertInstanceOf(ProvenanceDataAuthorizable.class, result);
assertEquals(connectionSource, ((EnforcePolicyPermissionsThroughBaseResource) result).getBaseAuthorizable());
}

@Test
void testConnectionInConnectorManagedGroupReturnsProvenanceDataAuthorizableForSource() {
final Authorizable result = factory.createProvenanceDataAuthorizable(CONNECTOR_CONNECTION_ID);

assertInstanceOf(ProvenanceDataAuthorizable.class, result);
assertEquals(connectorConnectionSource, ((EnforcePolicyPermissionsThroughBaseResource) result).getBaseAuthorizable());
}

@Test
void testNonExistentComponentThrowsResourceNotFoundException() {
assertThrows(ResourceNotFoundException.class, () ->
factory.createProvenanceDataAuthorizable(NON_EXISTENT_ID)
);
}
}

@Nested
class CreateRemoteDataAuthorizable {

private RemoteGroupPort rootRemoteGroupPort;
private RemoteGroupPort connectorRemoteGroupPort;

@Mock
private RemoteProcessGroup connectorRemoteProcessGroup;

@BeforeEach
void setUpRemotePorts() {
rootRemoteGroupPort = mock(RemoteGroupPort.class);
connectorRemoteGroupPort = mock(RemoteGroupPort.class);

when(flowController.findRemoteGroupPortIncludingConnectorManaged(ROOT_REMOTE_PORT_ID)).thenReturn(rootRemoteGroupPort);
when(rootRemoteGroupPort.getRemoteProcessGroup()).thenReturn(remoteProcessGroup);

when(flowController.findRemoteGroupPortIncludingConnectorManaged(CONNECTOR_REMOTE_PORT_ID)).thenReturn(connectorRemoteGroupPort);
when(connectorRemoteGroupPort.getRemoteProcessGroup()).thenReturn(connectorRemoteProcessGroup);
}

@Test
void testRemoteGroupPortInRootGroupReturnsDataAuthorizableForRemoteProcessGroup() {
final Authorizable result = factory.createRemoteDataAuthorizable(ROOT_REMOTE_PORT_ID);

assertInstanceOf(DataAuthorizable.class, result);
assertEquals(remoteProcessGroup, ((EnforcePolicyPermissionsThroughBaseResource) result).getBaseAuthorizable());
}

@Test
void testRemoteGroupPortInConnectorManagedGroupReturnsDataAuthorizableForRemoteProcessGroup() {
final Authorizable result = factory.createRemoteDataAuthorizable(CONNECTOR_REMOTE_PORT_ID);

assertInstanceOf(DataAuthorizable.class, result);
assertEquals(connectorRemoteProcessGroup, ((EnforcePolicyPermissionsThroughBaseResource) result).getBaseAuthorizable());
}

@Test
void testNonExistentRemoteGroupPortThrowsResourceNotFoundException() {
assertThrows(ResourceNotFoundException.class, () ->
factory.createRemoteDataAuthorizable(NON_EXISTENT_ID)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1859,7 +1859,6 @@ private ProvenanceEventDTO createProvenanceEventDto(final ProvenanceEventRecord

private void setComponentDetails(final ProvenanceEventDTO dto) {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final ProcessGroup root = getRootGroup();

final Connectable connectable = findLocalConnectable(dto.getComponentId());
if (connectable != null) {
Expand All @@ -1879,15 +1878,11 @@ private void setComponentDetails(final ProvenanceEventDTO dto) {
return;
}

final RemoteGroupPort remoteGroupPort = root.findRemoteGroupPort(dto.getComponentId());
final RemoteGroupPort remoteGroupPort = flowController.findRemoteGroupPortIncludingConnectorManaged(dto.getComponentId());
if (remoteGroupPort != null) {
final String remoteGroupPortGroupId = remoteGroupPort.getProcessGroupIdentifier();
dto.setGroupId(remoteGroupPortGroupId);

final ProcessGroup remotePortGroup = root.findProcessGroup(remoteGroupPortGroupId);
if (remotePortGroup != null) {
remotePortGroup.getConnectorIdentifier().ifPresent(dto::setConnectorId);
}
final ProcessGroup remotePortGroup = remoteGroupPort.getProcessGroup();
dto.setGroupId(remotePortGroup.getIdentifier());
remotePortGroup.getConnectorIdentifier().ifPresent(dto::setConnectorId);

// if the user is approved for this component policy, provide additional details, otherwise override/redact as necessary
if (Result.Approved.equals(remoteGroupPort.checkAuthorization(authorizer, RequestAction.READ, user).getResult())) {
Expand All @@ -1900,7 +1895,7 @@ private void setComponentDetails(final ProvenanceEventDTO dto) {
return;
}

final Connection connection = root.findConnection(dto.getComponentId());
final Connection connection = flowController.findConnectionIncludingConnectorManaged(dto.getComponentId());
if (connection != null) {
final ProcessGroup connectionGroup = connection.getProcessGroup();
dto.setGroupId(connectionGroup.getIdentifier());
Expand Down
Loading
Loading