Skip to content

MTV-3129: Add ClusterRole (forklift-migrator-role) migration tests for local remote cluster#293

Open
AmenB wants to merge 1 commit into
RedHatQE:mainfrom
AmenB:clusterrole
Open

MTV-3129: Add ClusterRole (forklift-migrator-role) migration tests for local remote cluster#293
AmenB wants to merge 1 commit into
RedHatQE:mainfrom
AmenB:clusterrole

Conversation

@AmenB
Copy link
Copy Markdown
Contributor

@AmenB AmenB commented Feb 11, 2026

PR Type

Tests, Enhancement


Description

  • Add comprehensive ClusterRole migration tests for forklift-migrator-role

    • Cold migration test with token-based OCP provider
    • Warm migration test with token-based OCP provider
    • ConfigMap and Secret migration verification tests
  • Implement clusterrole_destination_ocp_provider fixture for token-based authentication

    • Creates fresh ServiceAccount and binds to existing forklift-migrator-role
    • Generates token and creates Forklift Provider CR
  • Add utility functions for ConfigMap and Secret migration verification

  • Fix virtctl_binary fixture to respect VIRTCTL_PATH environment variable

  • Add test configuration parameters for three ClusterRole migration scenarios


Diagram Walkthrough

flowchart LR
  A["Test Config<br/>test_mtv_clusterrole_*"] -->|defines| B["Test Classes<br/>Cold/Warm/ConfigMap-Secret"]
  C["clusterrole_destination_ocp_provider<br/>fixture"] -->|creates| D["ServiceAccount<br/>+ ClusterRoleBinding"]
  D -->|generates| E["Token Secret"]
  E -->|creates| F["Forklift Provider CR"]
  B -->|uses| F
  B -->|verifies| G["VM Migration<br/>+ ConfigMap/Secret"]
  H["clusterrole_utils<br/>verify functions"] -->|validates| G
Loading

File Walkthrough

Relevant files
Enhancement
conftest.py
Add token-based OCP provider fixture and env var support 

conftest.py

  • Added base64 import for token decoding
  • Added imports for ClusterRoleBinding and ServiceAccount resources
  • Fixed virtctl_binary fixture to check VIRTCTL_PATH environment
    variable first
  • Implemented clusterrole_destination_ocp_provider session-scoped
    fixture that creates ServiceAccount, binds to forklift-migrator-role,
    generates token, and creates Forklift Provider CR
  • Added FORKLIFT_MIGRATOR_ROLE_NAME constant for the existing
    ClusterRole
+109/-0 
clusterrole_utils.py
Add ConfigMap and Secret migration verification utilities

utilities/clusterrole_utils.py

  • Implemented verify_configmap_migrated function to assert ConfigMap
    exists in target namespace and data matches source
  • Implemented verify_secret_migrated function to assert Secret exists in
    target namespace and data matches source
  • Both functions include comprehensive logging and detailed assertion
    error messages
  • Functions handle missing resources and data mismatches gracefully
+116/-0 
Tests
test_mtv_clusterrole_migration.py
Add ClusterRole migration test suite                                         

tests/test_mtv_clusterrole_migration.py

  • Created three test classes for ClusterRole migration scenarios
  • TestClusterroleColdMtvMigration: verifies cold migration with
    token-based provider
  • TestClusterroleWarmMtvMigration: verifies warm migration with
    token-based provider
  • TestClusterroleConfigmapSecretMigration: verifies ConfigMap and Secret
    migration alongside VM migration
  • All tests use clusterrole_destination_ocp_provider fixture and verify
    VM running status post-migration
+298/-0 
Configuration changes
config.py
Add test parameters for ClusterRole migration tests           

tests/tests_config/config.py

  • Added test_mtv_clusterrole_cold_migration configuration with single
    RHEL8 VM and cold migration
  • Added test_mtv_clusterrole_warm_migration configuration with single
    RHEL8 VM powered on and warm migration enabled
  • Added test_mtv_clusterrole_configmap_secret_migration configuration
    with single RHEL8 VM and cold migration
+22/-0   

Summary by CodeRabbit

  • New Features

    • ClusterRole-based destination provider using a service-account token.
    • Environment variable override to select the virtctl binary.
  • Utilities

    • Helpers to run ClusterRole-based migrations and verify VM status, ConfigMap, and Secret migration results.
  • Tests

    • End-to-end tests covering cold, warm, and ConfigMap/Secret ClusterRole migration scenarios.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Feb 11, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds a ClusterRole-based destination provider fixture, three end-to-end MTV clusterrole migration tests (cold, warm, configmap/secret), supporting test configs, new migration utilities for plan execution and verification, a VIRTCTL_PATH env override, and constant FORKLIFT_MIGRATOR_ROLE_NAME.

Changes

Cohort / File(s) Summary
Test infra & fixture
conftest.py
Added FORKLIFT_MIGRATOR_ROLE_NAME constant; virtctl_binary now honors VIRTCTL_PATH; new clusterrole_destination_ocp_provider fixture that creates a ServiceAccount, ClusterRoleBinding (binds SA to forklift-migrator-role), waits for SA token Secret, decodes base64 token, creates provider Secret (token + insecureSkipVerify), and creates a Forklift Provider CR. Added base64 import, TYPE_CHECKING imports for ClusterRoleBinding/ServiceAccount, and timeout-related imports.
Tests & configs
tests/test_mtv_clusterrole_migration.py, tests/tests_config/config.py
New test module with three parameterized end-to-end scenarios: cold migration, warm migration, and ConfigMap/Secret migration; tests marked remote/tier0 and use cleanup_migrated_vms; added corresponding test entries in test configs.
Utilities
utilities/clusterrole_utils.py
New helpers: run_clusterrole_migration(...) to build storage/network maps, create and execute Plan, and wait for completion; verify_vms_running(...) to assert migrated VMs are RUNNING; verify_configmap_migrated(...) and verify_secret_migrated(...) to validate resource existence and payload equality between source and target namespaces.

Sequence Diagram(s)

sequenceDiagram
    participant Test
    participant Fixture as clusterrole_destination_ocp_provider
    participant OCP as OpenShift Cluster
    participant Forklift as Forklift Provider
    participant Migration as Migration Plan

    Test->>Fixture: request provider
    Fixture->>OCP: create ServiceAccount
    OCP-->>Fixture: SA created
    Fixture->>OCP: create ClusterRoleBinding (SA -> forklift-migrator-role)
    OCP-->>Fixture: CRB created
    Fixture->>OCP: create service-account-token Secret
    OCP-->>Fixture: Secret created (base64 token)
    Fixture->>Fixture: decode token (base64)
    Fixture->>OCP: create provider Secret (token + insecureSkipVerify)
    OCP-->>Fixture: provider Secret created
    Fixture->>Forklift: create Provider CR referencing provider Secret
    Forklift-->>Fixture: Provider CR created
    Fixture-->>Test: yield OCPProvider
    Test->>Migration: run_clusterrole_migration(...)
    Migration->>OCP: create/execute Plan, migrate VMs/resources
    OCP-->>Migration: migration finished
    Test->>Test: verify_vms_running / verify_configmap_migrated / verify_secret_migrated
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 3 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Merge Conflict Detection ⚠️ Warning ❌ Merge conflicts detected (12 files):

⚔️ .pre-commit-config.yaml (content)
⚔️ README.md (content)
⚔️ conftest.py (content)
⚔️ docs/copyoffload/how-to-run-copyoffload-tests.md (content)
⚔️ exceptions/exceptions.py (content)
⚔️ pyproject.toml (content)
⚔️ tests/test_copyoffload_migration.py (content)
⚔️ tests/test_mtv_warm_migration.py (content)
⚔️ tests/tests_config/config.py (content)
⚔️ utilities/pytest_utils.py (content)
⚔️ utilities/utils.py (content)
⚔️ uv.lock (content)

These conflicts must be resolved before merging into main.
Resolve conflicts locally and push changes to this branch.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: introducing ClusterRole-based migration tests for a forklift-migrator-role with remote cluster support, which aligns with all file additions across conftest.py, test_mtv_clusterrole_migration.py, utilities, and config.
Docstring Coverage ✅ Passed Docstring coverage is 83.33% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@redhat-qe-bot
Copy link
Copy Markdown

Report bugs in Issues

Welcome! 🎉

This pull request will be automatically processed with the following features:

🔄 Automatic Actions

  • Reviewer Assignment: Reviewers are automatically assigned based on the OWNERS file in the repository root
  • Size Labeling: PR size labels (XS, S, M, L, XL, XXL) are automatically applied based on changes
  • Issue Creation: Disabled for this repository
  • Branch Labeling: Branch-specific labels are applied to track the target branch
  • Auto-verification: Auto-verified users have their PRs automatically marked as verified
  • Labels: All label categories are enabled (default configuration)

📋 Available Commands

PR Status Management

  • /wip - Mark PR as work in progress (adds WIP: prefix to title)
  • /wip cancel - Remove work in progress status
  • /hold - Block PR merging (approvers only)
  • /hold cancel - Unblock PR merging
  • /verified - Mark PR as verified
  • /verified cancel - Remove verification status
  • /reprocess - Trigger complete PR workflow reprocessing (useful if webhook failed or configuration changed)
  • /regenerate-welcome - Regenerate this welcome message

Review & Approval

  • /lgtm - Approve changes (looks good to me)
  • /approve - Approve PR (approvers only)
  • /automerge - Enable automatic merging when all requirements are met (maintainers and approvers only)
  • /assign-reviewers - Assign reviewers based on OWNERS file
  • /assign-reviewer @username - Assign specific reviewer
  • /check-can-merge - Check if PR meets merge requirements

Testing & Validation

  • /retest tox - Run Python test suite with tox
  • /retest build-container - Rebuild and test container image
  • /retest all - Run all available tests

Container Operations

  • /build-and-push-container - Build and push container image (tagged with PR number)
    • Supports additional build arguments: /build-and-push-container --build-arg KEY=value

Cherry-pick Operations

  • /cherry-pick <branch> - Schedule cherry-pick to target branch when PR is merged
    • Multiple branches: /cherry-pick branch1 branch2 branch3

Label Management

  • /<label-name> - Add a label to the PR
  • /<label-name> cancel - Remove a label from the PR

✅ Merge Requirements

This PR will be automatically approved when the following conditions are met:

  1. Approval: /approve from at least one approver
  2. LGTM Count: Minimum 0 /lgtm from reviewers
  3. Status Checks: All required status checks must pass
  4. No Blockers: No WIP, hold, conflict labels
  5. Verified: PR must be marked as verified (if verification is enabled)

📊 Review Process

Approvers and Reviewers

Approvers:

  • myakove

Reviewers:

  • krcmarik
  • myakove
Available Labels
  • hold
  • verified
  • wip
  • lgtm
  • approve
  • automerge

💡 Tips

  • WIP Status: Use /wip when your PR is not ready for review
  • Verification: The verified label is automatically removed on each new commit
  • Cherry-picking: Cherry-pick labels are processed when the PR is merged
  • Container Builds: Container images are automatically tagged with the PR number
  • Permission Levels: Some commands require approver permissions
  • Auto-verified Users: Certain users have automatic verification and merge privileges

For more information, please refer to the project documentation or contact the maintainers.

@qodo-code-review
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 Security concerns

Sensitive information handling:
The new fixture in conftest.py materializes a live ServiceAccount token (token_value) and stores it in a Kubernetes Secret (provider_secret). That is expected for the Provider CR, but it increases blast radius if cluster logs/failed test artifacts capture Secret contents. Ensure no logging prints Secret .instance or token value, and verify teardown/cleanup reliably removes the created ServiceAccount/ClusterRoleBinding/token Secret/provider Secret.

Executable path injection: virtctl_binary now accepts VIRTCTL_PATH and returns it without validation, which can allow running an attacker-controlled binary if the environment is influenced in CI or shared runners. Validate the path (exists, file, executable, non-symlink) before use.

⚡ Recommended focus areas for review

Insecure Env Path

The virtctl_binary fixture trusts VIRTCTL_PATH blindly and returns it without validation. This can lead to executing an unexpected binary (or a non-executable file) if the environment is manipulated. Add checks that the path exists, is a regular file, and is executable (and ideally resolve/forbid symlinks to match the existing hijack protections used for the cached binary path).

#if env variable VIRTCTL_PATH is set, use it.
if os.environ.get("VIRTCTL_PATH"):
    return Path(os.environ.get("VIRTCTL_PATH"))
Token Handling

The token acquisition flow relies on a service-account-token Secret being populated asynchronously, then base64-decoding the token. This needs more defensive handling: ensure the Secret read is consistent (avoid recreating objects in the sampler), handle decode errors, and consider increasing robustness around timing/ready conditions. Also ensure the created Provider Secret is properly cleaned up/redacted in logs and that only the minimum required RBAC is bound to the ServiceAccount.

# Token secret: create Secret with type service-account-token so cluster populates token
token_secret = create_and_store_resource(
    client=ocp_admin_client,
    fixture_store=fixture_store,
    resource=Secret,
    name=token_secret_name,
    namespace=target_namespace,
    type="kubernetes.io/service-account-token",
    annotations={"kubernetes.io/service-account.name": sa_name},
)

# Wait for token to be populated (controller fills it asynchronously)
def _has_token():
    s = Secret(client=ocp_admin_client, name=token_secret_name, namespace=target_namespace)
    s.wait()
    return (s.instance.data or {}).get("token")

for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
    if sample:
        token_b64 = sample
        break
else:
    raise ValueError(
        f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
    )

token_value = base64.b64decode(token_b64).decode("utf-8")

# Provider secret (Forklift expects "token" and "insecureSkipVerify")
provider_secret = create_and_store_resource(
    client=ocp_admin_client,
    fixture_store=fixture_store,
    resource=Secret,
    name=f"{provider_name}-secret",
    namespace=target_namespace,
    string_data={"token": token_value, "insecureSkipVerify": "true"},
)

# Provider CR
provider = create_and_store_resource(
    client=ocp_admin_client,
    fixture_store=fixture_store,
    resource=Provider,
    name=provider_name,
    namespace=target_namespace,
    secret_name=provider_secret.name,
    secret_namespace=provider_secret.namespace,
    url=ocp_admin_client.configuration.host,
    provider_type=Provider.ProviderType.OPENSHIFT,
)

yield OCPProvider(ocp_resource=provider, fixture_store=fixture_store)
Secret Compare

verify_secret_migrated claims it compares decoded secret data, but it actually compares the raw .instance.data values (base64-encoded). Either update the docstring to match behavior or decode both sides before comparison. Also consider comparing type and handling stringData vs data differences to avoid false negatives across migration implementations.

def verify_secret_migrated(
    client: "DynamicClient",
    source_namespace: str,
    target_namespace: str,
    secret_name: str,
) -> None:
    """Verify that a Secret was migrated from source to target namespace.

    Asserts the Secret exists in the target namespace and that its data
    matches the source Secret (compares decoded data).

    Args:
        client: OpenShift/Kubernetes client.
        source_namespace: Namespace where the Secret was created before migration.
        target_namespace: Namespace where the Secret should exist after migration.
        secret_name: Name of the Secret.

    Raises:
        AssertionError: If Secret is missing in target or data does not match.
    """
    source_secret = Secret(
        client=client,
        name=secret_name,
        namespace=source_namespace,
    )
    if not source_secret.exists:
        raise AssertionError(
            f"Source Secret {secret_name} not found in namespace {source_namespace}"
        )

    target_secret = Secret(
        client=client,
        name=secret_name,
        namespace=target_namespace,
    )
    assert target_secret.exists, (
        f"Secret {secret_name} was not migrated to target namespace {target_namespace}"
    )

    source_data = source_secret.instance.data or {}
    target_data = target_secret.instance.data or {}
    assert set(source_data.keys()) == set(target_data.keys()), (
        f"Secret {secret_name} keys mismatch: source {list(source_data)} != target {list(target_data)}"
    )
    for key in source_data:
        assert source_data[key] == target_data[key], (
            f"Secret {secret_name} data for key {key} mismatch between source and target"
        )

💡 Tool usage guide:

Overview:
The review tool scans the PR code changes, and generates a PR review which includes several types of feedbacks, such as possible PR issues, security threats and relevant test in the PR. More feedbacks can be added by configuring the tool.

The tool can be triggered automatically every time a new PR is opened, or can be invoked manually by commenting on any PR.

  • When commenting, to edit configurations related to the review tool (pr_reviewer section), use the following template:
/review --pr_reviewer.some_config1=... --pr_reviewer.some_config2=...
[pr_reviewer]
some_config1=...
some_config2=...

See the review usage page for a comprehensive guide on using this tool.

@qodo-code-review
Copy link
Copy Markdown

qodo-code-review Bot commented Feb 11, 2026

PR Code Suggestions ✨

Latest suggestions up to 383e62f

CategorySuggestion                                                                                                                                    Impact
Possible bug
Make token polling more robust

Refactor the token polling logic to be more efficient by waiting for the secret
once and then using get() in the polling loop. Add a try-except block to handle
potential base64 decoding errors.

conftest.py [870-895]

 token_secret_ref = Secret(
     client=ocp_admin_client,
     name=token_secret_name,
     namespace=target_namespace,
 )
+token_secret_ref.wait(timeout=60)
 
 def _has_token() -> str | None:
-    token_secret_ref.wait()
+    token_secret_ref.get()
     return (token_secret_ref.instance.data or {}).get("token")
 
 token_b64 = None
 try:
     for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
         if sample:
             token_b64 = sample
             break
-    if not token_b64:
-        raise ValueError(
-            f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
-        )
 except TimeoutExpiredError:
+    token_b64 = None
+
+if not token_b64:
     raise ValueError(
         f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
-    ) from None
+    )
 
-token_value = base64.b64decode(token_b64).decode("utf-8")
+try:
+    token_value = base64.b64decode(token_b64).decode("utf-8")
+except (ValueError, UnicodeDecodeError) as exc:
+    raise ValueError(f"Invalid base64 token in Secret {token_secret_name}") from exc
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion improves the token polling logic by replacing an inefficient wait() call inside the loop with a more appropriate get(), and adds robust error handling for base64 decoding, making the code more reliable.

Medium
Prevent status polling attribute errors

Make the VM status check more robust by safely accessing printableStatus to
prevent potential AttributeError exceptions while the VM is being created.

utilities/post_migration.py [1377-1387]

+def _get_printable_status(_vm: VirtualMachine) -> str | None:
+    instance = _vm.instance
+    status = getattr(instance, "status", None) if instance else None
+    return getattr(status, "printableStatus", None) if status else None
+
 for sample in TimeoutSampler(
     wait_timeout=remaining,
     sleep=VM_RUNNING_POLL_INTERVAL,
-    func=lambda _vm=vm: _vm.instance.status.printableStatus,
+    func=lambda _vm=vm: _get_printable_status(_vm),
 ):
-    if sample == VirtualMachine.Status.RUNNING:
+    if sample == "Running":
         LOGGER.info(f"VM {vm.name} reached Running status")
         break
     if sample != last_status:
         LOGGER.info(f"VM {vm.name} status: {sample}, waiting for Running")
         last_status = sample
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential AttributeError if the VM status is not immediately available and provides a robust way to handle it, preventing intermittent test failures.

Medium
Security
Validate env-provided binary path

Validate that the VIRTCTL_PATH environment variable points to an existing,
executable file before using it. Also, fix a typo in the virtctl_binary
function's docstring.

conftest.py [454-480]

 def virtctl_binary(ocp_admin_client: "DynamicClient") -> Path:
     """Download and configure virtctl binary from the cluster.
 
     ...
 
-    Returns:F
+    Returns:
         Path: Path to the downloaded virtctl binary.
 
     ...
     """
     # if env variable VIRTCTL_PATH is set, use it.
     if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
-        return Path(virtctl_env_path)
+        virtctl_path = Path(virtctl_env_path).expanduser().resolve()
+        if not virtctl_path.is_file():
+            raise ValueError(f"VIRTCTL_PATH does not point to a file: {virtctl_path}")
+        if not os.access(virtctl_path, os.X_OK):
+            raise ValueError(f"VIRTCTL_PATH is not executable: {virtctl_path}")
+        return virtctl_path

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a lack of validation for the VIRTCTL_PATH environment variable, and the proposed change improves robustness and provides clearer errors. It also fixes a minor docstring typo.

Low
  • More

Previous suggestions

Suggestions up to commit 41829a4
CategorySuggestion                                                                                                                                    Impact
Possible bug
Use TokenRequest for SA token

Use the TokenRequest API to obtain the service account token instead of creating
a secret and waiting for it to be populated, as the latter method is deprecated
and may fail on modern clusters.

conftest.py [858-878]

-# Token secret: create Secret with type service-account-token so cluster populates token
-create_and_store_resource(
-    client=ocp_admin_client,
-    fixture_store=fixture_store,
-    resource=Secret,
-    name=token_secret_name,
-    namespace=target_namespace,
-    type="kubernetes.io/service-account-token",
-    annotations={"kubernetes.io/service-account.name": sa_name},
-)
+def _request_sa_token() -> str:
+    api = ocp_admin_client.resources.get(
+        api_version="authentication.k8s.io/v1",
+        kind="TokenRequest",
+    )
+    token_request = api.create(
+        body={
+            "apiVersion": "authentication.k8s.io/v1",
+            "kind": "TokenRequest",
+            "spec": {"expirationSeconds": 3600},
+        },
+        namespace=target_namespace,
+        # subresource call: POST .../serviceaccounts/{name}/token
+        name=sa_name,
+        subresource="token",
+    )
+    token = (((token_request or {}).get("status") or {}).get("token"))
+    if token is None:
+        raise ValueError(f"TokenRequest did not return a token for ServiceAccount {sa_name}")
+    return token
 
-# Wait for token to be populated (controller fills it asynchronously)
-token_secret_ref = Secret(
-    client=ocp_admin_client,
-    name=token_secret_name,
-    namespace=target_namespace,
-)
+token_value = _request_sa_token()
 
-def _has_token() -> str | None:
-    token_secret_ref.wait()
-    return (token_secret_ref.instance.data or {}).get("token")
-
Suggestion importance[1-10]: 9

__

Why: The suggestion replaces a deprecated and often-disabled mechanism for obtaining service account tokens with the modern TokenRequest API, significantly improving the fixture's robustness and forward-compatibility.

High
Harden VM status polling

Improve the VM status polling logic to be more robust. Safely access the
printableStatus attribute to prevent errors and correct the calculation of the
remaining timeout to respect the total budget.

utilities/post_migration.py [1364-1392]

 for vm_config in prepared_plan["virtual_machines"]:
     vm = VirtualMachine(
         client=ocp_admin_client,
         name=vm_config["name"],
         namespace=target_namespace,
     )
     start = time.monotonic()
     vm.wait(timeout=VM_RUNNING_TIMEOUT)
     elapsed = time.monotonic() - start
-    remaining = max(VM_RUNNING_TIMEOUT - elapsed, 10)
+    remaining = max(VM_RUNNING_TIMEOUT - elapsed, 0)
+
     LOGGER.info(f"VM {vm.name} resource exists, waiting for Running status")
     last_status: str | None = None
+
+    def _get_printable_status(_vm: VirtualMachine = vm) -> str | None:
+        status = getattr(_vm.instance, "status", None)
+        return getattr(status, "printableStatus", None)
+
     try:
         for sample in TimeoutSampler(
             wait_timeout=remaining,
             sleep=VM_RUNNING_POLL_INTERVAL,
-            func=lambda _vm=vm: _vm.instance.status.printableStatus,
+            func=_get_printable_status,
         ):
             if sample == VirtualMachine.Status.RUNNING:
                 LOGGER.info(f"VM {vm.name} reached Running status")
                 break
             if sample != last_status:
                 LOGGER.info(f"VM {vm.name} status: {sample}, waiting for Running")
                 last_status = sample
     except TimeoutExpiredError:
         raise AssertionError(
             f"VM {vm.name} did not reach Running status within {VM_RUNNING_TIMEOUT}s. "
             f"Last observed status: {last_status}"
         ) from None
Suggestion importance[1-10]: 7

__

Why: The suggestion improves the robustness of the VM status polling logic by safely accessing nested attributes and correctly calculating the remaining timeout, preventing potential errors and ensuring the timeout is respected.

Medium
Security
Validate configured binary path

Validate that the path provided in the VIRTCTL_PATH environment variable exists,
is not a symlink, and is executable before using it.

conftest.py [478-480]

-# if env variable VIRTCTL_PATH is set, use it.
+# If env variable VIRTCTL_PATH is set, use it (after validation).
 if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
-    return Path(virtctl_env_path)
+    virtctl_path = Path(virtctl_env_path)
 
+    if not virtctl_path.exists():
+        raise ValueError(f"VIRTCTL_PATH '{virtctl_path}' does not exist")
+
+    if virtctl_path.is_symlink():
+        raise PermissionError(f"VIRTCTL_PATH '{virtctl_path}' must not be a symlink")
+
+    if not os.access(virtctl_path, os.X_OK):
+        raise ValueError(f"VIRTCTL_PATH '{virtctl_path}' is not executable")
+
+    return virtctl_path
+
Suggestion importance[1-10]: 8

__

Why: The suggestion improves security by validating the VIRTCTL_PATH to prevent symlink attacks and enhances robustness by ensuring the provided path is a valid, executable file.

Medium

best practice
Validate base64 token decoding

Improve the base64 decoding of the service account token by stripping whitespace
and adding validation to prevent errors from malformed data.

conftest.py [895]

-token_value = base64.b64decode(token_b64).decode("utf-8")
+token_bytes = base64.b64decode(token_b64.strip(), validate=True)
+token_value = token_bytes.decode("utf-8")
Suggestion importance[1-10]: 5

__

Why: The suggestion adds defensive decoding for the base64 token, which is a good practice to make the code more robust against malformed data, although the likelihood of this occurring in this context is low.

Low
Suggestions up to commit b8d4e97
CategorySuggestion                                                                                                                                    Impact
Possible bug
Prevent flaky status polling crashes

Make VM status polling more robust by adding checks to prevent AttributeError if
instance, status, or printableStatus are temporarily absent.

utilities/post_migration.py [1377-1387]

+def _get_printable_status(_vm: VirtualMachine) -> str | None:
+    instance = _vm.instance
+    if not instance:
+        return None
+    status = getattr(instance, "status", None)
+    if not status:
+        return None
+    return getattr(status, "printableStatus", None)
+
 for sample in TimeoutSampler(
     wait_timeout=remaining,
     sleep=VM_RUNNING_POLL_INTERVAL,
-    func=lambda _vm=vm: _vm.instance.status.printableStatus,
+    func=lambda _vm=vm: _get_printable_status(_vm),
 ):
-    if sample == VirtualMachine.Status.RUNNING:
+    if str(sample) == str(VirtualMachine.Status.RUNNING):
         LOGGER.info(f"VM {vm.name} reached Running status")
         break
     if sample != last_status:
         LOGGER.info(f"VM {vm.name} status: {sample}, waiting for Running")
         last_status = sample
Suggestion importance[1-10]: 7

__

Why: The suggestion improves the robustness of the VM status polling logic by defensively handling potentially missing attributes (instance, status, printableStatus), which prevents flaky test failures due to AttributeError.

Medium
Validate external binary override

Fix a docstring typo and add validation to ensure the VIRTCTL_PATH environment
variable points to an existing, executable file.

conftest.py [454-480]

 def virtctl_binary(ocp_admin_client: "DynamicClient") -> Path:
     """Download and configure virtctl binary from the cluster.
 ...
-    Returns:F
+    Returns:
         Path: Path to the downloaded virtctl binary.
 ...
     """
-    # if env variable VIRTCTL_PATH is set, use it.
+    # If env variable VIRTCTL_PATH is set, use it (validated).
     if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
-        return Path(virtctl_env_path)
+        virtctl_path = Path(virtctl_env_path)
+        if not virtctl_path.is_file():
+            raise ValueError(f"VIRTCTL_PATH must point to a file, got: {virtctl_path}")
+        if not os.access(virtctl_path, os.X_OK):
+            raise ValueError(f"VIRTCTL_PATH must be executable, got: {virtctl_path}")
+        return virtctl_path
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies a typo introduced in the PR and improves robustness by adding validation for the VIRTCTL_PATH, preventing potential downstream errors with a clear, early failure.

Low
Possible issue
Make token polling deterministic

Improve token polling by adding validation for base64 and UTF-8 decoding,
handling potential errors gracefully.

conftest.py [876-895]

-def _has_token() -> str | None:
-    token_secret_ref.wait()
-    return (token_secret_ref.instance.data or {}).get("token")
+def _get_token_b64() -> str | None:
+    token_secret_ref.wait(timeout=10)
+    data = token_secret_ref.instance.data or {}
+    return data.get("token")
 
-token_b64 = None
 try:
-    for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
+    token_b64 = None
+    for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_get_token_b64):
         if sample:
             token_b64 = sample
             break
     if not token_b64:
         raise ValueError(
             f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
         )
 except TimeoutExpiredError:
     raise ValueError(
         f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
     ) from None
 
-token_value = base64.b64decode(token_b64).decode("utf-8")
+try:
+    token_value = base64.b64decode(token_b64, validate=True).decode("utf-8")
+except (ValueError, UnicodeDecodeError) as exc:
+    raise ValueError(f"Invalid token data in Secret {token_secret_name}") from exc
Suggestion importance[1-10]: 7

__

Why: The suggestion improves robustness by adding validation for token decoding and wrapping it in a try-except block, which prevents potential crashes from invalid data and provides clearer error messages.

Medium
Enhancement
Ensure failure is for authorization

Improve the test's accuracy by asserting on the specific error message within
the pytest.raises block to ensure the migration fails due to the expected SCC
denial.

tests/test_mtv_clusterrole_migration.py [115-128]

 class TestClusterroleColdMtvMigration:
     """Verify ClusterRole (forklift-migrator-role) without SCC fails cold migration."""
 ...
     def test_migrate_vms(
         self,
         fixture_store,
         ocp_admin_client,
         target_namespace,
     ):
         """Execute migration, expecting failure without SCC binding."""
-        with pytest.raises(MigrationPlanExecError):
+        with pytest.raises(MigrationPlanExecError, match=r"(scc|SecurityContextConstraints|forbidden|permission)"):
             execute_migration(
                 ocp_admin_client=ocp_admin_client,
                 fixture_store=fixture_store,
                 plan=self.plan_resource,
                 target_namespace=target_namespace,
             )
Suggestion importance[1-10]: 7

__

Why: The suggestion improves the test's precision by adding a match parameter to pytest.raises, ensuring the test fails for the specific reason intended (SCC permissions) rather than for any generic error.

Medium
✅ Suggestions up to commit b9caf3c
CategorySuggestion                                                                                                                                    Impact
Possible bug
Poll for VM running status

Improve the VM readiness check by polling for the RUNNING status instead of
checking it only once, making the verification more robust.

utilities/clusterrole_utils.py [93-103]

 for vm_config in prepared_plan["virtual_machines"]:
     vm = VirtualMachine(
         client=ocp_admin_client,
         name=vm_config["name"],
         namespace=target_namespace,
     )
     vm.wait(timeout=300)
-    assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
-        f"VM {vm.name} is not Running after migration. "
-        f"Status: {vm.instance.status.printableStatus}"
-    )
 
+    def _vm_status() -> str | None:
+        vm.reload()
+        status = getattr(vm.instance, "status", None)
+        return getattr(status, "printableStatus", None)
+
+    for status in TimeoutSampler(wait_timeout=600, sleep=10, func=_vm_status):
+        if status == VirtualMachine.Status.RUNNING:
+            break
+    else:
+        final_status = _vm_status()
+        assert final_status == VirtualMachine.Status.RUNNING, (
+            f"VM {vm.name} is not Running after migration. Status: {final_status}"
+        )
+
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that a VM's status can be transient and improves the check by polling for the final RUNNING state, making the test verification more reliable.

Medium
Validate environment-provided binary path
Suggestion Impact:The code was changed to use a walrus-assigned `virtctl_env_path` variable and return `Path(virtctl_env_path)`, similar to the suggested refactor, but none of the proposed path validation checks were added and the duplicate comment was not fixed (it was effectively reintroduced).

code diff:

-    # if env variable VIRTCTL_PATH is set, use it.
-    if os.environ.get("VIRTCTL_PATH"):
-        return Path(os.environ.get("VIRTCTL_PATH"))
+    #if env variable VIRTCTL_PATH is set, use it.
+        # if env variable VIRTCTL_PATH is set, use it.
+    if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
+        return Path(virtctl_env_path)

Validate that the path provided by the VIRTCTL_PATH environment variable exists,
is a file, and is executable before returning it.

conftest.py [461-464]

-#if env variable VIRTCTL_PATH is set, use it.
-    # if env variable VIRTCTL_PATH is set, use it.
+# If env variable VIRTCTL_PATH is set, use it.
 if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
-    return Path(virtctl_env_path)
+    virtctl_path = Path(virtctl_env_path)
 
+    if not virtctl_path.exists():
+        raise ValueError(f"VIRTCTL_PATH does not exist: {virtctl_path}")
+    if not virtctl_path.is_file():
+        raise ValueError(f"VIRTCTL_PATH is not a file: {virtctl_path}")
+    if not os.access(virtctl_path, os.X_OK):
+        raise ValueError(f"VIRTCTL_PATH is not executable: {virtctl_path}")
+
+    return virtctl_path
+

[Suggestion processed]

Suggestion importance[1-10]: 6

__

Why: The suggestion improves robustness by validating the VIRTCTL_PATH environment variable, ensuring the provided path is an existing, executable file, which prevents later failures with unclear errors.

Low
Security
Harden token polling and decoding
Suggestion Impact:The helper function _has_token() was updated to include a return type annotation (-> str | None), matching part of the suggestion. However, the commit did not implement the proposed reload()-based polling changes or the validated base64 decoding/error handling.

code diff:

@@ -787,6 +786,18 @@
       4. Create Forklift Provider CR using that token
 
     Does NOT create the ClusterRole; forklift-migrator-role must already exist in the cluster.
+
+    Args:
+        fixture_store (dict[str, Any]): Fixture store for resource tracking and teardown.
+        ocp_admin_client (DynamicClient): OpenShift DynamicClient for cluster operations.
+        session_uuid (str): Unique session identifier for resource naming.
+        target_namespace (str): Namespace for provider resources.
+
+    Returns:
+        OCPProvider: Token-based OCP provider bound to forklift-migrator-role.
+
+    Raises:
+        ValueError: If the SA token is not populated within 60s.
     """
     sa_name = f"{session_uuid}-forklift-migrator-sa"
     binding_name = f"{session_uuid}-forklift-migrator-binding"
@@ -831,7 +842,7 @@
         namespace=target_namespace,
     )
 
-    def _has_token():
+    def _has_token() -> str | None:
         token_secret_ref.wait()
         return (token_secret_ref.instance.data or {}).get("token")

Improve token polling by using reload() instead of wait(), and add validation
with error handling for the base64 decoding step.

conftest.py [834-853]

-def _has_token():
-    token_secret_ref.wait()
+def _has_token() -> str | None:
+    token_secret_ref.reload()
     return (token_secret_ref.instance.data or {}).get("token")
 
-token_b64 = None
 try:
-    for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
-        if sample:
-            token_b64 = sample
-            break
-    if not token_b64:
-        raise ValueError(
-            f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
-        )
-except TimeoutExpiredError:
+    token_b64 = next(
+        sample
+        for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token)
+        if sample
+    )
+except (StopIteration, TimeoutExpiredError):
     raise ValueError(
         f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
     ) from None
 
-token_value = base64.b64decode(token_b64).decode("utf-8")
+try:
+    token_value = base64.b64decode(token_b64, validate=True).decode("utf-8")
+except (ValueError, UnicodeDecodeError):
+    raise ValueError(
+        f"Secret {token_secret_name} has an invalid token payload for ServiceAccount {sa_name}"
+    ) from None
Suggestion importance[1-10]: 7

__

Why: The suggestion improves the token polling logic by using reload() and adds validation for the base64 decoding, making the code more robust and preventing potential crashes with clear error messages.

Medium
✅ Suggestions up to commit 7587cdd
CategorySuggestion                                                                                                                                    Impact
Security
Validate environment-provided binary path
Suggestion Impact:The code was changed to use a walrus-assigned `virtctl_env_path` variable and return `Path(virtctl_env_path)`, similar to the suggested refactor, but none of the proposed path validation checks were added and the duplicate comment was not fixed (it was effectively reintroduced).

code diff:

-    # if env variable VIRTCTL_PATH is set, use it.
-    if os.environ.get("VIRTCTL_PATH"):
-        return Path(os.environ.get("VIRTCTL_PATH"))
+    #if env variable VIRTCTL_PATH is set, use it.
+        # if env variable VIRTCTL_PATH is set, use it.
+    if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
+        return Path(virtctl_env_path)

Add validation checks (absolute path, existence, file type, not a symlink,
executable) for the path provided via the VIRTCTL_PATH environment variable and
fix the duplicate comment.

conftest.py [461-464]

-#if env variable VIRTCTL_PATH is set, use it.
-    # if env variable VIRTCTL_PATH is set, use it.
+# If env variable VIRTCTL_PATH is set, use it.
 if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
-    return Path(virtctl_env_path)
+    virtctl_path = Path(virtctl_env_path)
 
+    if not virtctl_path.is_absolute():
+        raise ValueError(f"VIRTCTL_PATH must be an absolute path, got: {virtctl_env_path}")
+
+    if not virtctl_path.exists():
+        raise ValueError(f"VIRTCTL_PATH does not exist: {virtctl_env_path}")
+
+    if virtctl_path.is_symlink():
+        raise PermissionError(f"VIRTCTL_PATH must not be a symlink (hijack attempt): {virtctl_env_path}")
+
+    if not virtctl_path.is_file():
+        raise ValueError(f"VIRTCTL_PATH must point to a file, got: {virtctl_env_path}")
+
+    if not os.access(virtctl_path, os.X_OK):
+        raise ValueError(f"VIRTCTL binary is not executable: {virtctl_env_path}")
+
+    return virtctl_path
+
Suggestion importance[1-10]: 9

__

Why: The suggestion hardens the code by adding crucial validation for a path provided via an environment variable, fixing a potential security risk and improving robustness.

High
Possible bug
Refresh and safely decode token
Suggestion Impact:The commit updates the polling logic in clusterrole_destination_ocp_provider by introducing a persistent Secret reference and handling TimeoutExpiredError (importing it and wrapping the TimeoutSampler loop). However, it does not implement the suggested secret.refresh() inside the loop nor the suggested base64.b64decode(..., validate=True) validation.

code diff:

-from timeout_sampler import TimeoutSampler
+from timeout_sampler import TimeoutExpiredError, TimeoutSampler
 
 from exceptions.exceptions import (
     ForkliftPodsNotRunningError,
@@ -458,9 +458,10 @@
         PermissionError: If shared directory is a symlink (hijack attempt).
         TimeoutError: If timeout waiting for file lock.
     """
-    # if env variable VIRTCTL_PATH is set, use it.
-    if os.environ.get("VIRTCTL_PATH"):
-        return Path(os.environ.get("VIRTCTL_PATH"))
+    #if env variable VIRTCTL_PATH is set, use it.
+        # if env variable VIRTCTL_PATH is set, use it.
+    if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
+        return Path(virtctl_env_path)
 
     # Get cluster version for versioned caching
     cluster_version = get_cluster_version(ocp_admin_client)
@@ -772,11 +773,11 @@
 
 @pytest.fixture(scope="session")
 def clusterrole_destination_ocp_provider(
-    fixture_store,
-    ocp_admin_client,
-    session_uuid,
-    target_namespace,
-):
+    fixture_store: dict[str, Any],
+    ocp_admin_client: "DynamicClient",
+    session_uuid: str,
+    target_namespace: str,
+) -> OCPProvider:
     """Create a token-based OCP provider using the existing forklift-migrator-role and a fresh SA.
 
     Verifies the flow:
@@ -793,7 +794,7 @@
     provider_name = f"{session_uuid}-clusterrole-destination-ocp-provider"
 
     # 1. Create a fresh ServiceAccount in target namespace
-    service_account = create_and_store_resource(
+    create_and_store_resource(
         client=ocp_admin_client,
         fixture_store=fixture_store,
         resource=ServiceAccount,
@@ -810,11 +811,10 @@
         cluster_role=FORKLIFT_MIGRATOR_ROLE_NAME,
         subjects=[{"kind": "ServiceAccount", "name": sa_name, "namespace": target_namespace}],
     )
-    # Ensure binding is ready
     cluster_role_binding.wait()
 
     # Token secret: create Secret with type service-account-token so cluster populates token
-    token_secret = create_and_store_resource(
+    create_and_store_resource(
         client=ocp_admin_client,
         fixture_store=fixture_store,
         resource=Secret,
@@ -825,19 +825,30 @@
     )
 
     # Wait for token to be populated (controller fills it asynchronously)
+    token_secret_ref = Secret(
+        client=ocp_admin_client,
+        name=token_secret_name,
+        namespace=target_namespace,
+    )
+
     def _has_token():
-        s = Secret(client=ocp_admin_client, name=token_secret_name, namespace=target_namespace)
-        s.wait()
-        return (s.instance.data or {}).get("token")
-
-    for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
-        if sample:
-            token_b64 = sample
-            break
-    else:
+        token_secret_ref.wait()
+        return (token_secret_ref.instance.data or {}).get("token")
+
+    token_b64 = None
+    try:
+        for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
+            if sample:
+                token_b64 = sample
+                break
+        if not token_b64:
+            raise ValueError(
+                f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
+            )
+    except TimeoutExpiredError:
         raise ValueError(
             f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
-        )
+        ) from None
 
     token_value = base64.b64decode(token_b64).decode("utf-8")
 

In the clusterrole_destination_ocp_provider fixture, refresh the token secret
object inside the polling loop to avoid using a stale object cache, and add
robust validation for base64 decoding.

conftest.py [828-853]

 token_secret_ref = Secret(
     client=ocp_admin_client,
     name=token_secret_name,
     namespace=target_namespace,
 )
 
-def _has_token():
+def _get_token_b64() -> str | None:
     token_secret_ref.wait()
-    return (token_secret_ref.instance.data or {}).get("token")
+    token_secret_ref.refresh()
+    token = (token_secret_ref.instance.data or {}).get("token")
+    return token if isinstance(token, str) and token else None
 
-token_b64 = None
+token_b64: str | None = None
 try:
-    for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
-        if sample:
-            token_b64 = sample
-            break
-    if not token_b64:
-        raise ValueError(
-            f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
-        )
+    for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_get_token_b64):
+        token_b64 = sample
+        break
 except TimeoutExpiredError:
+    token_b64 = None
+
+if token_b64 is None:
     raise ValueError(
         f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
-    ) from None
+    )
 
-token_value = base64.b64decode(token_b64).decode("utf-8")
+try:
+    token_value = base64.b64decode(token_b64, validate=True).decode("utf-8")
+except Exception as exc:
+    raise ValueError(f"Invalid base64 token in Secret {token_secret_name}") from exc
Suggestion importance[1-10]: 8

__

Why: The suggestion fixes a potential race condition by refreshing the secret object within the polling loop, ensuring the test reliably gets the token once it's populated.

Medium
Possible issue
Poll for migrated resource consistency

In verify_configmap_migrated, replace the immediate assertion with a polling
mechanism to wait for the target ConfigMap to appear and its data to match the
source, accounting for replication delays.

utilities/clusterrole_utils.py [135-146]

 target_cm = ConfigMap(
     client=client,
     name=configmap_name,
     namespace=target_namespace,
 )
-assert target_cm.exists, f"ConfigMap {configmap_name} was not migrated to target namespace {target_namespace}"
 
-source_data = source_cm.instance.data or {}
-target_data = target_cm.instance.data or {}
-assert source_data == target_data, (
-    f"ConfigMap {configmap_name} data mismatch: source {source_data} != target {target_data}"
+def _target_cm_data() -> dict[str, Any] | None:
+    if not target_cm.exists:
+        return None
+    target_cm.refresh()
+    return target_cm.instance.data or {}
+
+source_data: dict[str, Any] = source_cm.instance.data or {}
+target_data: dict[str, Any] | None = None
+
+for sample in TimeoutSampler(wait_timeout=120, sleep=5, func=_target_cm_data):
+    if sample == source_data:
+        target_data = sample
+        break
+
+assert target_data == source_data, (
+    f"ConfigMap {configmap_name} was not migrated correctly to namespace {target_namespace}. "
+    f"Expected data: {source_data}. Got: {target_data}"
 )
Suggestion importance[1-10]: 7

__

Why: The suggestion improves test stability by polling for the migrated ConfigMap to account for eventual consistency, preventing potential flakes.

Medium
✅ Suggestions up to commit fe3c198
CategorySuggestion                                                                                                                                    Impact
Possible bug
Resolve merge conflict in fixture
Suggestion Impact:The commit partially addressed the merge conflict by removing the HEAD-side block and keeping the walrus-operator environment-variable check, but it did not fully resolve the conflict because a conflict marker (>>>>>>> ...) remains in the file.

code diff:

-<<<<<<< HEAD
-    # if env variable VIRTCTL_PATH is set, use it.
-    if os.environ.get("VIRTCTL_PATH"):
-        return Path(os.environ.get("VIRTCTL_PATH"))
-=======
     #if env variable VIRTCTL_PATH is set, use it.
         # if env variable VIRTCTL_PATH is set, use it.
     if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
         return Path(virtctl_env_path)
->>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)

Resolve the merge conflict in the virtctl_binary fixture in conftest.py by using
the version with the walrus operator to check for the VIRTCTL_PATH environment
variable.

conftest.py [461-470]

-<<<<<<< HEAD
-    # if env variable VIRTCTL_PATH is set, use it.
-    if os.environ.get("VIRTCTL_PATH"):
-        return Path(os.environ.get("VIRTCTL_PATH"))
-=======
-    #if env variable VIRTCTL_PATH is set, use it.
-        # if env variable VIRTCTL_PATH is set, use it.
-    if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
-        return Path(virtctl_env_path)
->>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)
+# if env variable VIRTCTL_PATH is set, use it.
+if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
+    return Path(virtctl_env_path)
Suggestion importance[1-10]: 10

__

Why: The suggestion correctly identifies and resolves merge conflict markers (<<<<<<< HEAD, =======, >>>>>>>) in conftest.py, which would otherwise cause a syntax error and break the tests.

High
Remove conflict markers and duplicates
Suggestion Impact:The commit removed the leftover merge conflict markers and deleted the redundant per-VM status assertion loops (including one conflicting duplicate verify_vms_running block), leaving verify_vms_running as the sole VM-running validation.

code diff:

@@ -62,19 +62,6 @@
             target_namespace=target_namespace,
         )
 
-<<<<<<< HEAD
-        for vm_config in prepared_plan["virtual_machines"]:
-            vm = VirtualMachine(
-                client=ocp_admin_client,
-                name=vm_config["name"],
-                namespace=target_namespace,
-            )
-            assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
-                f"VM {vm.name} is not Running after migration. Status: {vm.instance.status.printableStatus}"
-            )
-
-=======
->>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)
 
 @pytest.mark.remote
 @pytest.mark.tier0
@@ -125,19 +112,6 @@
             target_namespace=target_namespace,
         )
 
-<<<<<<< HEAD
-        for vm_config in prepared_plan["virtual_machines"]:
-            vm = VirtualMachine(
-                client=ocp_admin_client,
-                name=vm_config["name"],
-                namespace=target_namespace,
-            )
-            assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
-                f"VM {vm.name} is not Running after migration. Status: {vm.instance.status.printableStatus}"
-            )
-
-=======
->>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)
 
 @pytest.mark.remote
 @pytest.mark.tier0
@@ -221,21 +195,9 @@
             secret_name=secret_name,
         )
 
-<<<<<<< HEAD
-        for vm_config in prepared_plan["virtual_machines"]:
-            vm = VirtualMachine(
-                client=ocp_admin_client,
-                name=vm_config["name"],
-                namespace=target_namespace,
-            )
-            assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
-                f"VM {vm.name} is not Running after migration. Status: {vm.instance.status.printableStatus}"
-            )
-=======
         verify_vms_running(
             ocp_admin_client=ocp_admin_client,
             prepared_plan=prepared_plan,
             target_namespace=target_namespace,
         )
->>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)
-
+

Remove the merge conflict markers and the redundant VM status check from
test_mtv_clusterrole_cold_migration, as this logic is already handled by the
verify_vms_running function and would cause errors.

tests/test_mtv_clusterrole_migration.py [59-77]

 verify_vms_running(
     ocp_admin_client=ocp_admin_client,
     prepared_plan=prepared_plan,
     target_namespace=target_namespace,
 )
 
-<<<<<<< HEAD
-        for vm_config in prepared_plan["virtual_machines"]:
-            vm = VirtualMachine(
-                client=ocp_admin_client,
-                name=vm_config["name"],
-                namespace=target_namespace,
-            )
-            assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
-                f"VM {vm.name} is not Running after migration. Status: {vm.instance.status.printableStatus}"
-            )
-
-=======
->>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)
-
Suggestion importance[1-10]: 10

__

Why: The suggestion correctly identifies merge conflict markers and redundant code in tests/test_mtv_clusterrole_migration.py, which would cause a syntax error and a NameError since VirtualMachine is not imported.

High
Possible issue
Make token polling more robust
Suggestion Impact:The commit refactored the token polling logic to reuse a Secret reference and improved error handling around polling timeouts, which aligns with making token polling more robust, but it did not add token_secret_ref.refresh() or base64.b64decode(validate=True).

code diff:

     # Wait for token to be populated (controller fills it asynchronously)
+    token_secret_ref = Secret(
+        client=ocp_admin_client,
+        name=token_secret_name,
+        namespace=target_namespace,
+    )
+
     def _has_token():
-        s = Secret(client=ocp_admin_client, name=token_secret_name, namespace=target_namespace)
-        s.wait()
-        return (s.instance.data or {}).get("token")
-
-    for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
-        if sample:
-            token_b64 = sample
-            break
-    else:
+        token_secret_ref.wait()
+        return (token_secret_ref.instance.data or {}).get("token")
+
+    token_b64 = None
+    try:
+        for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
+            if sample:
+                token_b64 = sample
+                break
+        if not token_b64:
+            raise ValueError(
+                f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
+            )
+    except TimeoutExpiredError:
         raise ValueError(
             f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
-        )
+        ) from None
 
     token_value = base64.b64decode(token_b64).decode("utf-8")
 

In the clusterrole_destination_ocp_provider fixture, add
token_secret_ref.refresh() before accessing secret data and use validate=True in
base64.b64decode to improve the reliability of token polling and decoding.

conftest.py [840-859]

 def _has_token():
     token_secret_ref.wait()
-    return (token_secret_ref.instance.data or {}).get("token")
+    token_secret_ref.refresh()
+    token_data: dict[str, Any] = token_secret_ref.instance.data or {}
+    return token_data.get("token")
 ...
-token_value = base64.b64decode(token_b64).decode("utf-8")
+token_value = base64.b64decode(token_b64, validate=True).decode("utf-8")
Suggestion importance[1-10]: 6

__

Why: The suggestion improves robustness by adding an explicit refresh() call to prevent reading stale data and adds validation to the b64decode call, making the token retrieval process more reliable.

Low
✅ Suggestions up to commit d127edb
CategorySuggestion                                                                                                                                    Impact
Reliability
Wait for VM readiness
Suggestion Impact:The tests were refactored to call a shared helper (verify_vms_running) to validate VMs are running after migration, which likely encapsulates waiting/retry behavior instead of immediately asserting VM status inline.

code diff:

+        verify_vms_running(
+            ocp_admin_client=ocp_admin_client,
+            prepared_plan=prepared_plan,
+            target_namespace=target_namespace,
+        )
+
+<<<<<<< HEAD
         for vm_config in prepared_plan["virtual_machines"]:
             vm = VirtualMachine(
                 client=ocp_admin_client,
@@ -93,10 +70,11 @@
                 namespace=target_namespace,
             )
             assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
-                f"VM {vm.name} is not Running after migration. "
-                f"Status: {vm.instance.status.printableStatus}"
-            )
-
+                f"VM {vm.name} is not Running after migration. Status: {vm.instance.status.printableStatus}"
+            )
+
+=======
+>>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)
 
 @pytest.mark.remote
 @pytest.mark.tier0
@@ -130,45 +108,24 @@
         Steps: ClusterRole/SA/token/provider by fixture; create NetworkMap and StorageMap;
         create Plan, run warm migration, verify VMs running.
         """
-        vms = [vm["name"] for vm in prepared_plan["virtual_machines"]]
-        storage_map = get_storage_migration_map(
-            fixture_store=fixture_store,
-            target_namespace=target_namespace,
+        run_clusterrole_migration(
+            ocp_admin_client=ocp_admin_client,
+            fixture_store=fixture_store,
             source_provider=source_provider,
             destination_provider=clusterrole_destination_ocp_provider,
-            ocp_admin_client=ocp_admin_client,
+            prepared_plan=prepared_plan,
             source_provider_inventory=source_provider_inventory,
-            vms=vms,
-        )
-        network_map = get_network_migration_map(
-            fixture_store=fixture_store,
-            source_provider=source_provider,
-            destination_provider=clusterrole_destination_ocp_provider,
+            target_namespace=target_namespace,
             multus_network_name=multus_network_name,
-            ocp_admin_client=ocp_admin_client,
-            target_namespace=target_namespace,
-            source_provider_inventory=source_provider_inventory,
-            vms=vms,
-        )
-        populate_vm_ids(prepared_plan, source_provider_inventory)
-        plan_resource = create_plan_resource(
-            ocp_admin_client=ocp_admin_client,
-            fixture_store=fixture_store,
-            source_provider=source_provider,
-            destination_provider=clusterrole_destination_ocp_provider,
-            storage_map=storage_map,
-            network_map=network_map,
-            virtual_machines_list=prepared_plan["virtual_machines"],
-            target_namespace=target_namespace,
-            warm_migration=prepared_plan.get("warm_migration", False),
-        )
-        execute_migration(
-            ocp_admin_client=ocp_admin_client,
-            fixture_store=fixture_store,
-            plan=plan_resource,
-            target_namespace=target_namespace,
-        )
-
+            cut_over=get_cutover_value(),
+        )
+        verify_vms_running(
+            ocp_admin_client=ocp_admin_client,
+            prepared_plan=prepared_plan,
+            target_namespace=target_namespace,
+        )
+
+<<<<<<< HEAD
         for vm_config in prepared_plan["virtual_machines"]:
             vm = VirtualMachine(
                 client=ocp_admin_client,
@@ -176,10 +133,11 @@
                 namespace=target_namespace,
             )
             assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
-                f"VM {vm.name} is not Running after migration. "
-                f"Status: {vm.instance.status.printableStatus}"
-            )
-
+                f"VM {vm.name} is not Running after migration. Status: {vm.instance.status.printableStatus}"
+            )
+
+=======
+>>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)
 
 @pytest.mark.remote
 @pytest.mark.tier0
@@ -213,6 +171,11 @@
         Creates ConfigMap and Secret in source namespace, migrates VM, verifies both
         were migrated to target namespace using verify_configmap_migrated and verify_secret_migrated.
         """
+        if source_vms_namespace is None:
+            pytest.skip(
+                "source_vms_namespace is None: test requires an OpenShift source provider"
+            )
+
         configmap_name = f"{fixture_store['session_uuid']}-test-configmap"
         create_and_store_resource(
             client=ocp_admin_client,
@@ -233,43 +196,15 @@
             string_data={"username": "testuser", "password": "testpass"},
         )
 
-        vms = [vm["name"] for vm in prepared_plan["virtual_machines"]]
-        storage_map = get_storage_migration_map(
-            fixture_store=fixture_store,
-            target_namespace=target_namespace,
+        run_clusterrole_migration(
+            ocp_admin_client=ocp_admin_client,
+            fixture_store=fixture_store,
             source_provider=source_provider,
             destination_provider=clusterrole_destination_ocp_provider,
-            ocp_admin_client=ocp_admin_client,
+            prepared_plan=prepared_plan,
             source_provider_inventory=source_provider_inventory,
-            vms=vms,
-        )
-        network_map = get_network_migration_map(
-            fixture_store=fixture_store,
-            source_provider=source_provider,
-            destination_provider=clusterrole_destination_ocp_provider,
+            target_namespace=target_namespace,
             multus_network_name=multus_network_name,
-            ocp_admin_client=ocp_admin_client,
-            target_namespace=target_namespace,
-            source_provider_inventory=source_provider_inventory,
-            vms=vms,
-        )
-        populate_vm_ids(prepared_plan, source_provider_inventory)
-        plan_resource = create_plan_resource(
-            ocp_admin_client=ocp_admin_client,
-            fixture_store=fixture_store,
-            source_provider=source_provider,
-            destination_provider=clusterrole_destination_ocp_provider,
-            storage_map=storage_map,
-            network_map=network_map,
-            virtual_machines_list=prepared_plan["virtual_machines"],
-            target_namespace=target_namespace,
-            warm_migration=prepared_plan.get("warm_migration", False),
-        )
-        execute_migration(
-            ocp_admin_client=ocp_admin_client,
-            fixture_store=fixture_store,
-            plan=plan_resource,
-            target_namespace=target_namespace,
         )
 
         verify_configmap_migrated(
@@ -286,6 +221,7 @@
             secret_name=secret_name,
         )
 
+<<<<<<< HEAD
         for vm_config in prepared_plan["virtual_machines"]:
             vm = VirtualMachine(
                 client=ocp_admin_client,
@@ -293,7 +229,13 @@
                 namespace=target_namespace,
             )
             assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
-                f"VM {vm.name} is not Running after migration. "
-                f"Status: {vm.instance.status.printableStatus}"
-            )
-
+                f"VM {vm.name} is not Running after migration. Status: {vm.instance.status.printableStatus}"
+            )
+=======
+        verify_vms_running(
+            ocp_admin_client=ocp_admin_client,
+            prepared_plan=prepared_plan,
+            target_namespace=target_namespace,
+        )

Add vm.wait() before asserting the VirtualMachine status to allow time for it to
reach the 'Running' state.

tests/test_mtv_clusterrole_migration.py [90-98]

 vm = VirtualMachine(
     client=ocp_admin_client,
     name=vm_config["name"],
     namespace=target_namespace,
 )
+vm.wait(timeout=300)
 assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
     f"VM {vm.name} is not Running after migration. "
     f"Status: {vm.instance.status.printableStatus}"
 )
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly points out that checking the VM status immediately after migration can be flaky; adding a wait improves test reliability.

Medium
High-level
Refactor tests to eliminate code duplication
Suggestion Impact:The repeated test logic for building maps/plan and executing migrations was replaced by calls to a shared helper (run_clusterrole_migration) and a shared VM verification helper (verify_vms_running), reducing duplication across the test methods.

code diff:

-from utilities.clusterrole_utils import verify_configmap_migrated, verify_secret_migrated
-from utilities.mtv_migration import (
-    create_plan_resource,
-    execute_migration,
-    get_network_migration_map,
-    get_storage_migration_map,
-)
+from utilities.clusterrole_utils import (
+    run_clusterrole_migration,
+    verify_configmap_migrated,
+    verify_secret_migrated,
+    verify_vms_running,
+)
+from utilities.migration_utils import get_cutover_value
 from utilities.resources import create_and_store_resource
-from utilities.utils import get_value_from_py_config, populate_vm_ids
+from utilities.utils import get_value_from_py_config
 
 
 @pytest.mark.remote
@@ -47,45 +46,23 @@
         Steps: ClusterRole/SA/token/provider by fixture; create NetworkMap and StorageMap;
         create Plan, run migration, verify VMs running.
         """
-        vms = [vm["name"] for vm in prepared_plan["virtual_machines"]]
-        storage_map = get_storage_migration_map(
-            fixture_store=fixture_store,
-            target_namespace=target_namespace,
+        run_clusterrole_migration(
+            ocp_admin_client=ocp_admin_client,
+            fixture_store=fixture_store,
             source_provider=source_provider,
             destination_provider=clusterrole_destination_ocp_provider,
-            ocp_admin_client=ocp_admin_client,
+            prepared_plan=prepared_plan,
             source_provider_inventory=source_provider_inventory,
-            vms=vms,
-        )
-        network_map = get_network_migration_map(
-            fixture_store=fixture_store,
-            source_provider=source_provider,
-            destination_provider=clusterrole_destination_ocp_provider,
+            target_namespace=target_namespace,
             multus_network_name...

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🤖 Fix all issues with AI agents
In `@conftest.py`:
- Around line 795-802: The local variables service_account and token_secret are
assigned from create_and_store_resource but never used; either drop the
assignments or rename them to _service_account and _token_secret to indicate
intentional discard. Update the calls to create_and_store_resource (the
invocations that currently assign to service_account and token_secret) so they
do not create unused locals, ensuring fixture_store still receives/stores the
resources as before; apply the same change to the other occurrences around the
create_and_store_resource calls referenced in the review.
- Around line 864-867: The fixture currently yields an OCPProvider instance
(yield OCPProvider(ocp_resource=provider, fixture_store=fixture_store)) but has
no teardown logic; replace the yield with a return to follow Ruff PT022 and make
the intent explicit that this fixture has no local teardown (i.e., change to
return OCPProvider(...)). Keep the same arguments (ocp_resource=provider,
fixture_store=fixture_store) and rely on fixture_store for session-level
cleanup.
- Around line 461-463: Replace the double call to os.environ.get("VIRTCTL_PATH")
with a single lookup stored in a local variable (e.g., virtctl =
os.environ.get("VIRTCTL_PATH")), then check that variable (if virtctl:) and
return Path(virtctl); modify the block containing the current os.environ.get
usage so only one environment lookup is performed and Path() is invoked with the
stored value.
- Around line 773-779: Add full type annotations to the fixture function
clusterrole_destination_ocp_provider: annotate its return type as
Generator[OCPProvider, None, None] and annotate each parameter (fixture_store,
ocp_admin_client, session_uuid, target_namespace) with their appropriate types
(e.g., fixture_store: dict, ocp_admin_client: OCPClient or the actual client
type used in the repo, session_uuid: str, target_namespace: str) so the
signature uses built-in typing and matches the guideline (Generator is already
imported).
- Around line 828-831: Currently `_has_token` re-instantiates Secret on every
poll; instead create the Secret wrapper once (Secret(client=ocp_admin_client,
name=token_secret_name, namespace=target_namespace)) in the outer scope and have
`_has_token` only call the existing instance's refresh/wait and read its data
(e.g., call s.wait() or s.reload()/s.refresh() on that single `s` and return
(s.instance.data or {}).get("token")). Update references to `_has_token`,
`Secret`, `s.wait()`, and `s.instance.data` accordingly so the wrapper isn't
recreated each poll.
- Around line 827-840: The TimeoutSampler loop using _has_token should catch
TimeoutExpiredError instead of relying on a for-else (which is unreachable);
wrap the for sample in a try/except that imports TimeoutExpiredError, on success
set token_b64 = sample as now, and in the except TimeoutExpiredError raise a
ValueError that includes token_secret_name and sa_name to preserve the
descriptive error path.

In `@tests/test_mtv_clusterrole_migration.py`:
- Around line 198-232: test_mtv_clusterrole_configmap_secret_migration passes
source_vms_namespace (which can be None for non-OpenShift sources) into
create_and_store_resource causing failures; add an early guard at the start of
test_mtv_clusterrole_configmap_secret_migration to assert or skip when
source_vms_namespace is None (e.g., pytest.skip("source_vms_namespace is None:
test requires an OpenShift source") or raise a clear ValueError) before calling
create_and_store_resource for ConfigMap and Secret, referencing the function
name create_and_store_resource and the test function
test_mtv_clusterrole_configmap_secret_migration so the intent and location are
obvious.
- Around line 34-97: Extract the duplicated migration flow and VM verification
into shared helpers and call them from test_mtv_clusterrole_cold_migration (and
the other two tests); specifically, create a utility (e.g.,
utilities/clusterrole_utils.py) with a run_clusterrole_migration(...) function
that encapsulates the sequence using get_storage_migration_map,
get_network_migration_map, populate_vm_ids, create_plan_resource, and
execute_migration, and a verify_vms_running(...) function that iterates
prepared_plan["virtual_machines"] and asserts each
VirtualMachine.instance.status.printableStatus == VirtualMachine.Status.RUNNING;
then replace the duplicated blocks in test_mtv_clusterrole_cold_migration, the
warm test, and the configmap/secret test to call these helpers.
- Around line 116-169: The test_mtv_clusterrole_warm_migration test calls
execute_migration(...) without the cut_over parameter, so update the
execute_migration call to include cut_over=get_cutover_value() (matching other
warm migration tests) to ensure the warm migration completes its cutover phase;
locate the execute_migration invocation in test_mtv_clusterrole_warm_migration
and add the cut_over argument accordingly.

In `@utilities/clusterrole_utils.py`:
- Around line 42-43: The source/target existence checks in
verify_configmap_migrated (the `if not source_cm.exists: raise
AssertionError(...)` block) and verify_secret_migrated use mixed styles; make
them consistent by replacing explicit `raise AssertionError(...)` with an
`assert ... , msg` (or conversely replace the `assert` with `raise
AssertionError(msg)`) across both functions, update the message text to match
the existing assertions, and if you intentionally want different semantics add a
brief inline comment above the chosen form explaining why that form
(precondition vs test assertion) is required.
- Around line 60-103: The docstring for verify_secret_migrated is misleading: it
claims it "compares decoded data" but the implementation compares
Secret.instance.data (base64-encoded) values; either update the docstring to say
it compares raw/encoded Secret data or change the code to decode both source and
target data before comparison; locate the function verify_secret_migrated and
the use of Secret.instance.data and adjust accordingly (and optionally simplify
the per-key loop to a single equality check like source_data == target_data if
you prefer consistency with the ConfigMap helper).

Comment thread conftest.py Outdated
Comment thread conftest.py Outdated
Comment thread conftest.py Outdated
Comment thread conftest.py Outdated
Comment thread conftest.py Outdated
Comment thread tests/test_mtv_clusterrole_migration.py Outdated
Comment thread tests/test_mtv_clusterrole_migration.py Outdated
Comment thread tests/test_mtv_clusterrole_migration.py Outdated
Comment thread utilities/clusterrole_utils.py Outdated
Comment thread utilities/clusterrole_utils.py Outdated
@qodo-code-review
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 Security concerns

Sensitive credential handling:
The new clusterrole_destination_ocp_provider fixture decodes a ServiceAccount token and writes it into a Provider Secret via string_data (and sets insecureSkipVerify to true). While this is likely acceptable for test environments, it is still a real cluster credential—ensure it is not logged, is scoped to the minimal RBAC required, and that cleanup reliably deletes the created Secret/ServiceAccount/ClusterRoleBinding.

⚡ Recommended focus areas for review

Merge Conflict

The virtctl_binary fixture contains unresolved git merge-conflict markers, which will break parsing/execution and must be resolved to a single implementation for handling VIRTCTL_PATH.

<<<<<<< HEAD
    # if env variable VIRTCTL_PATH is set, use it.
    if os.environ.get("VIRTCTL_PATH"):
        return Path(os.environ.get("VIRTCTL_PATH"))
=======
    #if env variable VIRTCTL_PATH is set, use it.
        # if env variable VIRTCTL_PATH is set, use it.
    if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
        return Path(virtctl_env_path)
>>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)
Merge Conflict

The test file includes unresolved merge-conflict markers and duplicated VM-running verification logic; this will fail test collection and should be reduced to a single, consistent post-migration verification approach.

<<<<<<< HEAD
        for vm_config in prepared_plan["virtual_machines"]:
            vm = VirtualMachine(
                client=ocp_admin_client,
                name=vm_config["name"],
                namespace=target_namespace,
            )
            assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
                f"VM {vm.name} is not Running after migration. Status: {vm.instance.status.printableStatus}"
            )

=======
>>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)
Merge Conflict

Unresolved merge-conflict markers exist in the ConfigMap/Secret verification helpers, and there is duplicated/contradictory assertion logic in the Secret verification that should be unified.

<<<<<<< HEAD
    if not source_cm.exists:
        raise AssertionError(f"Source ConfigMap {configmap_name} not found in namespace {source_namespace}")
=======
    assert source_cm.exists, (
        f"Source ConfigMap {configmap_name} not found in namespace {source_namespace}"
    )
>>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)

    target_cm = ConfigMap(
        client=client,
        name=configmap_name,
        namespace=target_namespace,
    )
    assert target_cm.exists, f"ConfigMap {configmap_name} was not migrated to target namespace {target_namespace}"

    source_data = source_cm.instance.data or {}
    target_data = target_cm.instance.data or {}
    assert source_data == target_data, (
        f"ConfigMap {configmap_name} data mismatch: source {source_data} != target {target_data}"
    )
    LOGGER.info(f"ConfigMap {configmap_name} verified in target namespace {target_namespace}")


def verify_secret_migrated(
    client: "DynamicClient",
    source_namespace: str,
    target_namespace: str,
    secret_name: str,
) -> None:
    """Verify that a Secret was migrated from source to target namespace.

    Asserts the Secret exists in the target namespace and that its data
    matches the source Secret (compares raw Secret.data, base64-encoded).

    Args:
        client: OpenShift/Kubernetes client.
        source_namespace: Namespace where the Secret was created before migration.
        target_namespace: Namespace where the Secret should exist after migration.
        secret_name: Name of the Secret.

    Raises:
        AssertionError: If Secret is missing in target or data does not match.
    """
    source_secret = Secret(
        client=client,
        name=secret_name,
        namespace=source_namespace,
    )
<<<<<<< HEAD
    if not source_secret.exists:
        raise AssertionError(f"Source Secret {secret_name} not found in namespace {source_namespace}")
=======
    assert source_secret.exists, (
        f"Source Secret {secret_name} not found in namespace {source_namespace}"
    )
>>>>>>> 629918e (refactor: Address CodeRabbit review for MTV-3129 ClusterRole tests)

    target_secret = Secret(
        client=client,
        name=secret_name,
        namespace=target_namespace,
    )
    assert target_secret.exists, f"Secret {secret_name} was not migrated to target namespace {target_namespace}"

    source_data = source_secret.instance.data or {}
    target_data = target_secret.instance.data or {}
    assert source_data == target_data, (
        f"Secret {secret_name} data mismatch between source and target"
    )
<<<<<<< HEAD
    for key in source_data:
        assert source_data[key] == target_data[key], (
            f"Secret {secret_name} data for key {key} mismatch between source and target"
        )
    LOGGER.info(f"Secret {secret_name} verified in target namespace {target_namespace}")
=======
    LOGGER.info(
        f"Secret {secret_name} verified in target namespace {target_namespace}"
    )

💡 Tool usage guide:

Overview:
The review tool scans the PR code changes, and generates a PR review which includes several types of feedbacks, such as possible PR issues, security threats and relevant test in the PR. More feedbacks can be added by configuring the tool.

The tool can be triggered automatically every time a new PR is opened, or can be invoked manually by commenting on any PR.

  • When commenting, to edit configurations related to the review tool (pr_reviewer section), use the following template:
/review --pr_reviewer.some_config1=... --pr_reviewer.some_config2=...
[pr_reviewer]
some_config1=...
some_config2=...

See the review usage page for a comprehensive guide on using this tool.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Fix all issues with AI agents
In `@conftest.py`:
- Around line 461-470: conftest.py contains leftover git merge conflict markers
around the virtctl_binary logic which makes the module unparseable; remove the
conflict markers (<<<<<<<, =======, >>>>>>>) and keep the refactored
walrus-operator variant: in the virtctl_binary function use the assignment
expression (virtctl_env_path := os.environ.get("VIRTCTL_PATH")) and return
Path(virtctl_env_path) when truthy, ensuring the surrounding comment is tidy and
only one branch remains.

In `@tests/test_mtv_clusterrole_migration.py`:
- Around line 65-77: This file contains leftover git merge conflict markers
around three blocks that duplicate inline VM-check loops; remove the conflict
markers and the HEAD-side inline loops so the refactored version that calls the
helper verify_vms_running remains. Specifically, delete the <<<<<<< / ======= /
>>>>>>> regions and the duplicated for vm_config ... VirtualMachine(...)
assertions (the ones referencing prepared_plan, vm_config, VirtualMachine and
checking vm.instance.status.printableStatus) in all three locations and ensure
each test simply calls verify_vms_running(...) as intended.
- Around line 174-177: Replace the in-test runtime call to pytest.skip() for
source_vms_namespace with a collection-time skip marker: add an
`@pytest.mark.skipif` on the test (or enclosing test class) that checks
py_config.get("source_provider_type") != "openshift" and includes a clear reason
string, so the guard is evaluated at collection rather than inside the test
body; if py_config cannot be relied on at collection time, document the
exception near the test instead of calling pytest.skip() at runtime. Ensure you
update the test that currently references source_vms_namespace so the skip
condition is applied via the decorator rather than inside the function body.

In `@utilities/clusterrole_utils.py`:
- Around line 31-45: Add full docstrings to the functions missing them: update
run_clusterrole_migration and verify_vms_running to include an initial summary
plus explicit Args, Returns, and Raises sections; for run_clusterrole_migration
document parameters (ocp_admin_client, fixture_store, source_provider,
destination_provider, prepared_plan, source_provider_inventory,
target_namespace, multus_network_name, cut_over), the return type (None), and
any exceptions that may be raised (e.g., migration/plan creation errors or wait
timeouts), and do the same for verify_vms_running documenting its parameters,
return value, and possible exceptions so both functions comply with the
project's docstring guidelines.
- Around line 131-138: This file contains unresolved git conflict markers that
break Python parsing; remove all conflict markers (<<<<<<<, =======, >>>>>>>) in
the three regions and keep the refactor/assert-style branches instead of the
explicit raise branches: replace the source_cm presence check with the assert
form (assert source_cm.exists, f"Source ConfigMap {configmap_name} not found in
namespace {source_namespace}"), likewise choose the assert-style lines for the
other two conflict regions (the checks referencing the same ConfigMap/secret
existence variables used later in this module), then run a quick import or lint
to ensure there are no remaining markers and the module parses cleanly.

Comment thread conftest.py Outdated
Comment thread tests/test_mtv_clusterrole_migration.py Outdated
Comment thread tests/test_mtv_clusterrole_migration.py Outdated
Comment thread utilities/clusterrole_utils.py Outdated
Comment thread utilities/clusterrole_utils.py Outdated
@qodo-code-review
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 Security concerns

Sensitive credential handling:
The fixture creates a Provider Secret containing a live ServiceAccount token (string_data={"token": token_value, ...}) and also sets insecureSkipVerify to "true". Even for tests, this increases the blast radius if the namespace is shared or logs/events expose secret names. At minimum, ensure strict cleanup of the token Secret/Provider Secret and consider scoping access (short-lived token if possible) and avoiding insecureSkipVerify unless explicitly required.

⚡ Recommended focus areas for review

Syntax/Indent Bug

The virtctl_binary fixture has malformed indentation and duplicated comment lines around the VIRTCTL_PATH handling, which can cause an IndentationError or unintended block structure. Also, returning the env path without validating existence/executability can lead to confusing failures later.

    Path: Path to the downloaded virtctl binary.

Raises:
    ValueError: If virtctl download fails or binary is not executable.
    PermissionError: If shared directory ownership doesn't match current user.
    PermissionError: If shared directory is a symlink (hijack attempt).
    TimeoutError: If timeout waiting for file lock.
"""
#if env variable VIRTCTL_PATH is set, use it.
    # if env variable VIRTCTL_PATH is set, use it.
if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
    return Path(virtctl_env_path)

# Get cluster version for versioned caching
Fixture Cleanup

clusterrole_destination_ocp_provider is declared as a session-scoped fixture but returns an OCPProvider directly instead of yielding; this prevents any teardown logic from running inside the fixture itself. Even if create_and_store_resource tracks objects, reviewers should confirm resources (ServiceAccount, ClusterRoleBinding, token Secret, provider Secret, Provider CR) are reliably cleaned up and don’t leak across sessions.

@pytest.fixture(scope="session")
def clusterrole_destination_ocp_provider(
    fixture_store: dict[str, Any],
    ocp_admin_client: "DynamicClient",
    session_uuid: str,
    target_namespace: str,
) -> OCPProvider:
    """Create a token-based OCP provider using the existing forklift-migrator-role and a fresh SA.

    Verifies the flow:
      1. Create a fresh ServiceAccount (in target namespace)
      2. Bind it ONLY to the existing ClusterRole forklift-migrator-role (from operator/PR)
      3. Create a token for that SA (equivalent to: oc create token <sa> -n <ns>)
      4. Create Forklift Provider CR using that token

    Does NOT create the ClusterRole; forklift-migrator-role must already exist in the cluster.
    """
    sa_name = f"{session_uuid}-forklift-migrator-sa"
    binding_name = f"{session_uuid}-forklift-migrator-binding"
    token_secret_name = f"{session_uuid}-clusterrole-token"
    provider_name = f"{session_uuid}-clusterrole-destination-ocp-provider"

    # 1. Create a fresh ServiceAccount in target namespace
    create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=ServiceAccount,
        name=sa_name,
        namespace=target_namespace,
    )

    # 2. Bind it ONLY to the existing forklift-migrator-role (from PR)
    cluster_role_binding = create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=ClusterRoleBinding,
        name=binding_name,
        cluster_role=FORKLIFT_MIGRATOR_ROLE_NAME,
        subjects=[{"kind": "ServiceAccount", "name": sa_name, "namespace": target_namespace}],
    )
    cluster_role_binding.wait()

    # Token secret: create Secret with type service-account-token so cluster populates token
    create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=Secret,
        name=token_secret_name,
        namespace=target_namespace,
        type="kubernetes.io/service-account-token",
        annotations={"kubernetes.io/service-account.name": sa_name},
    )

    # Wait for token to be populated (controller fills it asynchronously)
    token_secret_ref = Secret(
        client=ocp_admin_client,
        name=token_secret_name,
        namespace=target_namespace,
    )

    def _has_token():
        token_secret_ref.wait()
        return (token_secret_ref.instance.data or {}).get("token")

    token_b64 = None
    try:
        for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
            if sample:
                token_b64 = sample
                break
        if not token_b64:
            raise ValueError(
                f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
            )
    except TimeoutExpiredError:
        raise ValueError(
            f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
        ) from None

    token_value = base64.b64decode(token_b64).decode("utf-8")

    # Provider secret (Forklift expects "token" and "insecureSkipVerify")
    provider_secret = create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=Secret,
        name=f"{provider_name}-secret",
        namespace=target_namespace,
        string_data={"token": token_value, "insecureSkipVerify": "true"},
    )

    # Provider CR
    provider = create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=Provider,
        name=provider_name,
        namespace=target_namespace,
        secret_name=provider_secret.name,
        secret_namespace=provider_secret.namespace,
        url=ocp_admin_client.configuration.host,
        provider_type=Provider.ProviderType.OPENSHIFT,
    )

    return OCPProvider(ocp_resource=provider, fixture_store=fixture_store)
Defensive Checks

verify_vms_running assumes vm.instance.status.printableStatus is always present after wait(). In transient states or API delays, status/printableStatus can be missing and raise an attribute error; add defensive access and a clearer assertion message to avoid masking the real failure mode.

def verify_vms_running(
    ocp_admin_client: "DynamicClient",
    prepared_plan: dict[str, Any],
    target_namespace: str,
) -> None:
    """Assert each VM in the plan is Running in the target namespace."""
    for vm_config in prepared_plan["virtual_machines"]:
        vm = VirtualMachine(
            client=ocp_admin_client,
            name=vm_config["name"],
            namespace=target_namespace,
        )
        vm.wait(timeout=300)
        assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
            f"VM {vm.name} is not Running after migration. "
            f"Status: {vm.instance.status.printableStatus}"
        )

💡 Tool usage guide:

Overview:
The review tool scans the PR code changes, and generates a PR review which includes several types of feedbacks, such as possible PR issues, security threats and relevant test in the PR. More feedbacks can be added by configuring the tool.

The tool can be triggered automatically every time a new PR is opened, or can be invoked manually by commenting on any PR.

  • When commenting, to edit configurations related to the review tool (pr_reviewer section), use the following template:
/review --pr_reviewer.some_config1=... --pr_reviewer.some_config2=...
[pr_reviewer]
some_config1=...
some_config2=...

See the review usage page for a comprehensive guide on using this tool.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@conftest.py`:
- Around line 834-836: The inner helper _has_token currently lacks a return type
annotation; update its signature to include the appropriate type (e.g., ->
Optional[str]) so the function reads as def _has_token() -> Optional[str]: and
ensure typing.Optional (or Optional) is imported if not already present; keep
the body unchanged (token_secret_ref.wait() and return
(token_secret_ref.instance.data or {}).get("token")) so static checkers like
Ruff ANN202 are satisfied.
- Around line 774-790: The fixture clusterrole_destination_ocp_provider is
missing formal docstring sections; update its docstring to include clearly
labeled Args (describing fixture_store, ocp_admin_client, session_uuid,
target_namespace and their types/roles), Returns (describe it returns an
OCPProvider), and Raises (list possible exceptions/errors the fixture may raise,
e.g., errors creating SA/token or missing ClusterRole), keeping the existing
narrative summary and ensuring wording matches project docstring style
guidelines.
- Around line 461-462: Remove the duplicate comment line in conftest.py (the
repeated "# if env variable VIRTCTL_PATH is set, use it.") so only a single
instance of that comment remains; locate the duplicated lines around the
VIRTCTL_PATH comment block and delete one of them to avoid redundancy.

Comment thread conftest.py Outdated
Comment thread conftest.py
Comment thread conftest.py Outdated
@qodo-code-review
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 Security concerns

Sensitive information exposure:
The new fixture creates a real ServiceAccount token and stores it into a Provider Secret (string_data={"token": ...}) in-cluster. That is expected for the test, but it increases the blast radius if the namespace is not isolated or if logs/failed dumps capture Secret contents. Also, the new test creates a Secret with string_data containing username/password literals; even though they are dummy values, they can still trigger secret-scanning policies—prefer non-credential-like placeholders or clearly marked test-only values.

⚡ Recommended focus areas for review

Syntax Risk

The new VIRTCTL_PATH handling has suspicious indentation and duplicated commented lines, which can easily introduce an IndentationError or accidental dead code paths. Additionally, returning the env-provided path without validation can cause confusing failures later (non-existent path, not executable, directory instead of file).

    Path: Path to the downloaded virtctl binary.

Raises:
    ValueError: If virtctl download fails or binary is not executable.
    PermissionError: If shared directory ownership doesn't match current user.
    PermissionError: If shared directory is a symlink (hijack attempt).
    TimeoutError: If timeout waiting for file lock.
"""
#if env variable VIRTCTL_PATH is set, use it.
    # if env variable VIRTCTL_PATH is set, use it.
if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
    return Path(virtctl_env_path)
Flaky Token

The session-scoped token provisioning relies on creating a kubernetes.io/service-account-token Secret and waiting for asynchronous population, which is deprecated/disabled in some clusters and can be flaky. The base64 decode path also lacks defensive handling for invalid/missing token data. Consider using TokenRequest (or an oc create token equivalent API) and add explicit error handling for decode failures.

@pytest.fixture(scope="session")
def clusterrole_destination_ocp_provider(
    fixture_store: dict[str, Any],
    ocp_admin_client: "DynamicClient",
    session_uuid: str,
    target_namespace: str,
) -> OCPProvider:
    """Create a token-based OCP provider using the existing forklift-migrator-role and a fresh SA.

    Verifies the flow:
      1. Create a fresh ServiceAccount (in target namespace)
      2. Bind it ONLY to the existing ClusterRole forklift-migrator-role (from operator/PR)
      3. Create a token for that SA (equivalent to: oc create token <sa> -n <ns>)
      4. Create Forklift Provider CR using that token

    Does NOT create the ClusterRole; forklift-migrator-role must already exist in the cluster.
    """
    sa_name = f"{session_uuid}-forklift-migrator-sa"
    binding_name = f"{session_uuid}-forklift-migrator-binding"
    token_secret_name = f"{session_uuid}-clusterrole-token"
    provider_name = f"{session_uuid}-clusterrole-destination-ocp-provider"

    # 1. Create a fresh ServiceAccount in target namespace
    create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=ServiceAccount,
        name=sa_name,
        namespace=target_namespace,
    )

    # 2. Bind it ONLY to the existing forklift-migrator-role (from PR)
    cluster_role_binding = create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=ClusterRoleBinding,
        name=binding_name,
        cluster_role=FORKLIFT_MIGRATOR_ROLE_NAME,
        subjects=[{"kind": "ServiceAccount", "name": sa_name, "namespace": target_namespace}],
    )
    cluster_role_binding.wait()

    # Token secret: create Secret with type service-account-token so cluster populates token
    create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=Secret,
        name=token_secret_name,
        namespace=target_namespace,
        type="kubernetes.io/service-account-token",
        annotations={"kubernetes.io/service-account.name": sa_name},
    )

    # Wait for token to be populated (controller fills it asynchronously)
    token_secret_ref = Secret(
        client=ocp_admin_client,
        name=token_secret_name,
        namespace=target_namespace,
    )

    def _has_token():
        token_secret_ref.wait()
        return (token_secret_ref.instance.data or {}).get("token")

    token_b64 = None
    try:
        for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
            if sample:
                token_b64 = sample
                break
        if not token_b64:
            raise ValueError(
                f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
            )
    except TimeoutExpiredError:
        raise ValueError(
            f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
        ) from None

    token_value = base64.b64decode(token_b64).decode("utf-8")

    # Provider secret (Forklift expects "token" and "insecureSkipVerify")
    provider_secret = create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=Secret,
        name=f"{provider_name}-secret",
        namespace=target_namespace,
        string_data={"token": token_value, "insecureSkipVerify": "true"},
    )

    # Provider CR
    provider = create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=Provider,
        name=provider_name,
        namespace=target_namespace,
        secret_name=provider_secret.name,
        secret_namespace=provider_secret.namespace,
        url=ocp_admin_client.configuration.host,
        provider_type=Provider.ProviderType.OPENSHIFT,
    )

    return OCPProvider(ocp_resource=provider, fixture_store=fixture_store)
Fragile Assert

VM readiness assertion relies on status.printableStatus being present and equal to RUNNING. In real clusters this field can be temporarily absent or reflect transitional states even when the VM is progressing correctly; this can increase test flakiness. Consider waiting on a more stable readiness condition (or checking status.ready/VMI state if available) and include null-safe access.

def verify_vms_running(
    ocp_admin_client: "DynamicClient",
    prepared_plan: dict[str, Any],
    target_namespace: str,
) -> None:
    """Assert each VM in the plan is Running in the target namespace."""
    for vm_config in prepared_plan["virtual_machines"]:
        vm = VirtualMachine(
            client=ocp_admin_client,
            name=vm_config["name"],
            namespace=target_namespace,
        )
        vm.wait(timeout=300)
        assert vm.instance.status.printableStatus == VirtualMachine.Status.RUNNING, (
            f"VM {vm.name} is not Running after migration. "
            f"Status: {vm.instance.status.printableStatus}"
        )

💡 Tool usage guide:

Overview:
The review tool scans the PR code changes, and generates a PR review which includes several types of feedbacks, such as possible PR issues, security threats and relevant test in the PR. More feedbacks can be added by configuring the tool.

The tool can be triggered automatically every time a new PR is opened, or can be invoked manually by commenting on any PR.

  • When commenting, to edit configurations related to the review tool (pr_reviewer section), use the following template:
/review --pr_reviewer.some_config1=... --pr_reviewer.some_config2=...
[pr_reviewer]
some_config1=...
some_config2=...

See the review usage page for a comprehensive guide on using this tool.

coderabbitai[bot]
coderabbitai Bot previously approved these changes Feb 12, 2026
@qodo-code-review
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 Security concerns

Sensitive credential handling:
The session fixture creates a real ServiceAccount token, decodes it, and stores it into a new Secret via string_data. This is expected for the test, but it increases the risk of accidental exposure via logs/exceptions/debug output. Ensure no logging prints token_value/Secret manifests, and consider minimizing token lifetime (TokenRequest with short expiration) and/or scoping the provider Secret name/namespace tightly and ensuring teardown always runs.

⚡ Recommended focus areas for review

Token creation

The token-based provider fixture relies on creating a kubernetes.io/service-account-token Secret and waiting for the controller to populate data.token. On newer Kubernetes/OpenShift versions this legacy behavior can be disabled/changed in favor of bounded ServiceAccount tokens (TokenRequest API). Validate this works reliably in the targeted OCP versions, and consider switching to an explicit TokenRequest flow to avoid flakes/timeouts.

@pytest.fixture(scope="session")
def clusterrole_destination_ocp_provider(
    fixture_store: dict[str, Any],
    ocp_admin_client: "DynamicClient",
    session_uuid: str,
    target_namespace: str,
) -> OCPProvider:
    """Create a token-based OCP provider using the existing forklift-migrator-role and a fresh SA.

    Verifies the flow:
      1. Create a fresh ServiceAccount (in target namespace)
      2. Bind it ONLY to the existing ClusterRole forklift-migrator-role (from operator/PR)
      3. Create a token for that SA (equivalent to: oc create token <sa> -n <ns>)
      4. Create Forklift Provider CR using that token

    Does NOT create the ClusterRole; forklift-migrator-role must already exist in the cluster.

    Args:
        fixture_store (dict[str, Any]): Fixture store for resource tracking and teardown.
        ocp_admin_client (DynamicClient): OpenShift DynamicClient for cluster operations.
        session_uuid (str): Unique session identifier for resource naming.
        target_namespace (str): Namespace for provider resources.

    Returns:
        OCPProvider: Token-based OCP provider bound to forklift-migrator-role.

    Raises:
        ValueError: If the SA token is not populated within 60s.
    """
    sa_name = f"{session_uuid}-forklift-migrator-sa"
    binding_name = f"{session_uuid}-forklift-migrator-binding"
    token_secret_name = f"{session_uuid}-clusterrole-token"
    provider_name = f"{session_uuid}-clusterrole-destination-ocp-provider"

    # 1. Create a fresh ServiceAccount in target namespace
    create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=ServiceAccount,
        name=sa_name,
        namespace=target_namespace,
    )

    # 2. Bind it ONLY to the existing forklift-migrator-role (from PR)
    cluster_role_binding = create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=ClusterRoleBinding,
        name=binding_name,
        cluster_role=FORKLIFT_MIGRATOR_ROLE_NAME,
        subjects=[{"kind": "ServiceAccount", "name": sa_name, "namespace": target_namespace}],
    )
    cluster_role_binding.wait()

    # Token secret: create Secret with type service-account-token so cluster populates token
    create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=Secret,
        name=token_secret_name,
        namespace=target_namespace,
        type="kubernetes.io/service-account-token",
        annotations={"kubernetes.io/service-account.name": sa_name},
    )

    # Wait for token to be populated (controller fills it asynchronously)
    token_secret_ref = Secret(
        client=ocp_admin_client,
        name=token_secret_name,
        namespace=target_namespace,
    )

    def _has_token() -> str | None:
        token_secret_ref.wait()
        return (token_secret_ref.instance.data or {}).get("token")

    token_b64 = None
    try:
        for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
            if sample:
                token_b64 = sample
                break
        if not token_b64:
            raise ValueError(
                f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
            )
    except TimeoutExpiredError:
        raise ValueError(
            f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
        ) from None

    token_value = base64.b64decode(token_b64).decode("utf-8")

    # Provider secret (Forklift expects "token" and "insecureSkipVerify")
    provider_secret = create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=Secret,
        name=f"{provider_name}-secret",
        namespace=target_namespace,
        string_data={"token": token_value, "insecureSkipVerify": "true"},
    )

    # Provider CR
    provider = create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=Provider,
        name=provider_name,
        namespace=target_namespace,
        secret_name=provider_secret.name,
        secret_namespace=provider_secret.namespace,
        url=ocp_admin_client.configuration.host,
        provider_type=Provider.ProviderType.OPENSHIFT,
    )

    return OCPProvider(ocp_resource=provider, fixture_store=fixture_store)
Path validation

The VIRTCTL_PATH override returns the provided path without checking that it exists and is executable. This can cause hard-to-debug test failures later, and could also allow running an unintended binary if the env var is misconfigured. Add minimal validation (exists, is file, executable) and raise a clear error.

# if env variable VIRTCTL_PATH is set, use it.
if virtctl_env_path := os.environ.get("VIRTCTL_PATH"):
    return Path(virtctl_env_path)
None handling

VM status polling reads instance.status.printableStatus directly; during early reconciliation status (or printableStatus) can be missing, which can raise attribute errors and fail the test for the wrong reason. Add defensive getattr/None handling and treat missing status as “not ready yet”.

def verify_vms_running(
    ocp_admin_client: "DynamicClient",
    prepared_plan: dict[str, Any],
    target_namespace: str,
) -> None:
    """Assert each VM in the plan is Running in the target namespace.

    Waits for each VM resource to exist, then polls printableStatus until it
    reaches Running. VMs typically transition through Provisioning -> Starting
    -> Running after migration. The total per-VM budget is VM_RUNNING_TIMEOUT
    seconds, shared between the initial wait for the resource to exist and the
    subsequent status polling.

    Args:
        ocp_admin_client (DynamicClient): OpenShift/Kubernetes client.
        prepared_plan (dict[str, Any]): Plan configuration with virtual_machines list.
        target_namespace (str): Target namespace to check VMs in.

    Raises:
        AssertionError: If any VM does not reach Running status within the timeout.
    """
    for vm_config in prepared_plan["virtual_machines"]:
        vm = VirtualMachine(
            client=ocp_admin_client,
            name=vm_config["name"],
            namespace=target_namespace,
        )
        start = time.monotonic()
        vm.wait(timeout=VM_RUNNING_TIMEOUT)
        elapsed = time.monotonic() - start
        remaining = max(VM_RUNNING_TIMEOUT - elapsed, 10)
        LOGGER.info(f"VM {vm.name} resource exists, waiting for Running status")
        last_status: str | None = None
        try:
            for sample in TimeoutSampler(
                wait_timeout=remaining,
                sleep=VM_RUNNING_POLL_INTERVAL,
                func=lambda _vm=vm: _vm.instance.status.printableStatus,
            ):
                if sample == VirtualMachine.Status.RUNNING:
                    LOGGER.info(f"VM {vm.name} reached Running status")
                    break
                if sample != last_status:
                    LOGGER.info(f"VM {vm.name} status: {sample}, waiting for Running")
                    last_status = sample
        except TimeoutExpiredError:
            raise AssertionError(
                f"VM {vm.name} did not reach Running status within {VM_RUNNING_TIMEOUT}s. "
                f"Last observed status: {last_status}"
            ) from None

💡 Tool usage guide:

Overview:
The review tool scans the PR code changes, and generates a PR review which includes several types of feedbacks, such as possible PR issues, security threats and relevant test in the PR. More feedbacks can be added by configuring the tool.

The tool can be triggered automatically every time a new PR is opened, or can be invoked manually by commenting on any PR.

  • When commenting, to edit configurations related to the review tool (pr_reviewer section), use the following template:
/review --pr_reviewer.some_config1=... --pr_reviewer.some_config2=...
[pr_reviewer]
some_config1=...
some_config2=...

See the review usage page for a comprehensive guide on using this tool.

@qodo-code-review
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 Security concerns

Sensitive token handling:
The clusterrole_destination_ocp_provider fixture decodes a ServiceAccount token and stores it in a new Secret (string_data={"token": token_value, ...}). While this is test code, it still creates real credentials in-cluster; ensure the fixture_store teardown reliably deletes both the token-populating Secret and the provider Secret, and avoid logging/printing token values. Additionally, virtctl_binary trusts VIRTCTL_PATH without validating existence/executability, which can allow executing an unintended binary in CI if the environment is compromised.

⚡ Recommended focus areas for review

Token Creation

The provider fixture creates a legacy kubernetes.io/service-account-token Secret and waits for the controller to populate .data.token. Many newer clusters disable auto-generated SA token Secrets in favor of TokenRequest/bound tokens, which can make this fixture flaky or fail entirely. Consider using the TokenRequest API (or the equivalent library support) rather than relying on secret population.

@pytest.fixture(scope="session")
def clusterrole_destination_ocp_provider(
    fixture_store: dict[str, Any],
    ocp_admin_client: "DynamicClient",
    session_uuid: str,
    target_namespace: str,
) -> OCPProvider:
    """Create a token-based OCP provider using the existing forklift-migrator-role and a fresh SA.

    Verifies the flow:
      1. Create a fresh ServiceAccount (in target namespace)
      2. Bind it ONLY to the existing ClusterRole forklift-migrator-role (from operator/PR)
      3. Create a token for that SA (equivalent to: oc create token <sa> -n <ns>)
      4. Create Forklift Provider CR using that token

    Does NOT create the ClusterRole; forklift-migrator-role must already exist in the cluster.

    Args:
        fixture_store (dict[str, Any]): Fixture store for resource tracking and teardown.
        ocp_admin_client (DynamicClient): OpenShift DynamicClient for cluster operations.
        session_uuid (str): Unique session identifier for resource naming.
        target_namespace (str): Namespace for provider resources.

    Returns:
        OCPProvider: Token-based OCP provider bound to forklift-migrator-role.

    Raises:
        ValueError: If the SA token is not populated within 60s.
    """
    sa_name = f"{session_uuid}-forklift-migrator-sa"
    binding_name = f"{session_uuid}-forklift-migrator-binding"
    token_secret_name = f"{session_uuid}-clusterrole-token"
    provider_name = f"{session_uuid}-clusterrole-destination-ocp-provider"

    # 1. Create a fresh ServiceAccount in target namespace
    create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=ServiceAccount,
        name=sa_name,
        namespace=target_namespace,
    )

    # 2. Bind it ONLY to the existing forklift-migrator-role (from PR)
    cluster_role_binding = create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=ClusterRoleBinding,
        name=binding_name,
        cluster_role=FORKLIFT_MIGRATOR_ROLE_NAME,
        subjects=[{"kind": "ServiceAccount", "name": sa_name, "namespace": target_namespace}],
    )
    cluster_role_binding.wait()

    # Token secret: create Secret with type service-account-token so cluster populates token
    create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=Secret,
        name=token_secret_name,
        namespace=target_namespace,
        type="kubernetes.io/service-account-token",
        annotations={"kubernetes.io/service-account.name": sa_name},
    )

    # Wait for token to be populated (controller fills it asynchronously)
    token_secret_ref = Secret(
        client=ocp_admin_client,
        name=token_secret_name,
        namespace=target_namespace,
    )

    def _has_token() -> str | None:
        token_secret_ref.wait()
        return (token_secret_ref.instance.data or {}).get("token")

    token_b64 = None
    try:
        for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
            if sample:
                token_b64 = sample
                break
        if not token_b64:
            raise ValueError(
                f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
            )
    except TimeoutExpiredError:
        raise ValueError(
            f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
        ) from None

    token_value = base64.b64decode(token_b64).decode("utf-8")

    # Provider secret (Forklift expects "token" and "insecureSkipVerify")
    provider_secret = create_and_store_resource(
        client=ocp_admin_client,
        fixture_store=fixture_store,
        resource=Secret,
        name=f"{provider_name}-secret",
        namespace=target_namespace,
        string_data={"token": token_value, "insecureSkipVerify": "true"},
    )
Potential Hang

The polling helper calls token_secret_ref.wait() inside the sampled function without an explicit timeout. If wait() blocks longer than expected (or indefinitely due to API issues), the outer timeout sampler may not be able to enforce the 60s budget. Prefer a non-blocking refresh/read or a wait(timeout=...) to ensure the overall timeout is respected.

# Wait for token to be populated (controller fills it asynchronously)
token_secret_ref = Secret(
    client=ocp_admin_client,
    name=token_secret_name,
    namespace=target_namespace,
)

def _has_token() -> str | None:
    token_secret_ref.wait()
    return (token_secret_ref.instance.data or {}).get("token")

token_b64 = None
try:
    for sample in TimeoutSampler(wait_timeout=60, sleep=2, func=_has_token):
        if sample:
            token_b64 = sample
            break
    if not token_b64:
        raise ValueError(
            f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
        )
except TimeoutExpiredError:
    raise ValueError(
        f"Token was not populated in Secret {token_secret_name} for ServiceAccount {sa_name} within 60s"
    ) from None
Docstring Typo

The virtctl_binary docstring has a malformed Returns section (Returns:F), which will break doc rendering/tools and is confusing for maintainers. Fix the typo to keep documentation consistent.

def virtctl_binary(ocp_admin_client: "DynamicClient") -> Path:
    """Download and configure virtctl binary from the cluster.

    This fixture ensures virtctl is available in PATH for all tests
    that need to interact with VMs via virtctl commands.

    Uses file locking to handle pytest-xdist parallel execution safely.
    The binary is downloaded to a shared directory that all workers can access.
    The directory includes the cluster version for automatic cache invalidation
    when switching between clusters with different versions.

    Args:
        ocp_admin_client (DynamicClient): OpenShift cluster client for accessing
            the cluster to download the virtctl binary.

    Returns:F
        Path: Path to the downloaded virtctl binary.

    Raises:
        ValueError: If virtctl download fails or binary is not executable.
        PermissionError: If shared directory ownership doesn't match current user.
        PermissionError: If shared directory is a symlink (hijack attempt).
        TimeoutError: If timeout waiting for file lock.

💡 Tool usage guide:

Overview:
The review tool scans the PR code changes, and generates a PR review which includes several types of feedbacks, such as possible PR issues, security threats and relevant test in the PR. More feedbacks can be added by configuring the tool.

The tool can be triggered automatically every time a new PR is opened, or can be invoked manually by commenting on any PR.

  • When commenting, to edit configurations related to the review tool (pr_reviewer section), use the following template:
/review --pr_reviewer.some_config1=... --pr_reviewer.some_config2=...
[pr_reviewer]
some_config1=...
some_config2=...

See the review usage page for a comprehensive guide on using this tool.

@redhat-qe-bot
Copy link
Copy Markdown

Clean rebase detected — no code changes compared to previous head (17b7664).

@AmenB
Copy link
Copy Markdown
Contributor Author

AmenB commented Apr 16, 2026

/verified

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants