diff --git a/tests/integration-tests/configs/isolated_regions.yaml b/tests/integration-tests/configs/isolated_regions.yaml index 15a594bf62..6b6743fd28 100644 --- a/tests/integration-tests/configs/isolated_regions.yaml +++ b/tests/integration-tests/configs/isolated_regions.yaml @@ -174,6 +174,7 @@ test-suites: instances: ["c5n.9xlarge"] oss: {{ OSS }} schedulers: {{ SCHEDULERS }} + flags: ["any-efa-instances"] # This test cannot be executed in US isolated regions # because its logic relies on downloading an external # dependency from GitHub, which is not possible diff --git a/tests/integration-tests/conftest.py b/tests/integration-tests/conftest.py index 7e650fb950..2a187c31bc 100644 --- a/tests/integration-tests/conftest.py +++ b/tests/integration-tests/conftest.py @@ -93,7 +93,7 @@ ) from xdist import get_xdist_worker_id -from tests.common.capacity_helpers import resolve_instance_with_capacity +from tests.common.capacity_helpers import get_efa_instance_types, resolve_instance_with_capacity from tests.common.osu_common import PRIVATE_OSES, run_osu_benchmarks from tests.common.schedulers_common import get_scheduler_commands from tests.common.storage.constants import StorageType @@ -688,8 +688,9 @@ def inject_internal_storage_settings(kwargs): def inject_placement_group_settings(vpc_stack, instance, region, kwargs): - if vpc_stack.az_override: - placement_group_name = f"{instance}_placement_group_{vpc_stack.az_override}" + az = vpc_stack.az_override or vpc_stack.default_az_id + if az: + placement_group_name = f"{instance}_placement_group_{az}" try: ec2_client = boto3.client("ec2", region_name=region) ec2_client.describe_placement_groups(GroupNames=[placement_group_name]) @@ -1303,10 +1304,10 @@ def serial_execution_by_instance(request, instance, region, os_platform): @pytest.fixture(autouse=True) def resolve_default_instance(request): - """Resolve default instance types (c5.xlarge / m6g.xlarge) to an alternative with available capacity. + """Reserve capacity for the test instance, substituting a same-spec alternative on ICE. - Uses create_capacity_reservation as a probe — same pattern as _try_reserve_head_node_instance - in test_efa.py. Only activates for the known default instance types; all others pass through. + Dedups against existing reservations for instances larger than ``.xlarge`` and reserves + EFA-capable instances with a placement group. When a substitute is found, ``request.node.funcargs["instance"]`` is updated so that downstream fixtures (architecture, pcluster_config_reader, etc.) and the test itself see the resolved value. @@ -1320,9 +1321,15 @@ def resolve_default_instance(request): region = request.getfixturevalue("region") os_name = request.getfixturevalue("os") vpc_stack = request.getfixturevalue("vpc_stack") + flags = request.getfixturevalue("flags") + alternative_instance_types = [] + if flags and "any-efa-instances" in flags: + alternative_instance_types = get_efa_instance_types(region) az_id = vpc_stack.az_override or vpc_stack.default_az_id - resolved = resolve_instance_with_capacity(region, az_id, instance, os_name) + resolved = resolve_instance_with_capacity( + region, az_id, instance, os_name, alternative_instance_types=alternative_instance_types + ) if resolved != instance: logging.info("Substituted default instance %s -> %s (capacity fallback)", instance, resolved) request.node.funcargs["instance"] = resolved diff --git a/tests/integration-tests/tests/common/capacity_helpers.py b/tests/integration-tests/tests/common/capacity_helpers.py index 3fc9f34da3..65fdb5cfd4 100644 --- a/tests/integration-tests/tests/common/capacity_helpers.py +++ b/tests/integration-tests/tests/common/capacity_helpers.py @@ -13,52 +13,126 @@ from datetime import datetime, timedelta, timezone import boto3 -from utils import get_similar_instance_types +from utils import get_instance_info, get_similar_instance_types -# Default instance types that are subject to capacity fallback. -DEFAULT_INSTANCE_TYPES = {"c5.xlarge", "m6g.xlarge", "m6i.xlarge"} +def _find_existing_reservation(ec2_client, instance_type, az_id, instance_platform, with_placement_group): + """Return True if an active capacity reservation already exists.""" + try: + reservations = ec2_client.describe_capacity_reservations( + Filters=[ + {"Name": "instance-type", "Values": [instance_type]}, + {"Name": "state", "Values": ["active"]}, + ] + )["CapacityReservations"] + except Exception as e: + logging.warning("Could not list existing capacity reservations for %s: %s", instance_type, e) + return False -def resolve_instance_with_capacity(region, az_id, instance_type, os, minutes=50, count=2): - """Try to reserve capacity for *instance_type* in *az_id*, falling back to alternatives on ICE. + for reservation in reservations: + if ( + reservation.get("AvailabilityZoneId") == az_id + and reservation.get("InstancePlatform") == instance_platform + and bool(reservation.get("PlacementGroupArn")) == with_placement_group + ): + logging.info( + "Reusing existing capacity reservation %s for %s in %s (placement_group=%s)", + reservation.get("CapacityReservationId"), + instance_type, + az_id, + with_placement_group, + ) + return True + return False + + +def get_efa_instance_types(region): + """Return EFA-capable instance types.""" + ec2_client = boto3.client("ec2", region_name=region) + paginator = ec2_client.get_paginator("describe_instance_types") + + efa_instances = [] + for page in paginator.paginate( + Filters=[ + {"Name": "network-info.efa-supported", "Values": ["true"]}, + {"Name": "supported-usage-class", "Values": ["on-demand", "spot"]}, + ], + ): + for instance in page["InstanceTypes"]: + vcpus = instance.get("VCpuInfo", {}).get("DefaultVCpus", 0) + efa_instances.append((vcpus, instance["InstanceType"])) + + # Primary sort by vCPU count (cost), secondary sort by name (determinism). + efa_instances.sort(key=lambda item: (item[0], item[1])) - Only activates when *instance_type* is one of the known defaults (c5.xlarge / m6g.xlarge). - For any other instance type the value is returned unchanged. + instance_types = [instance_type for _, instance_type in efa_instances] + logging.info("Retrieved %d EFA-capable instance types in %s: %s", len(instance_types), region, instance_types) + return instance_types - The probe uses ``create_capacity_reservation`` — the same proven pattern used by - ``_try_reserve_head_node_instance`` in test_efa.py. A successful reservation - guarantees capacity for the subsequent cluster launch and auto-expires after - *minutes* minutes. + +def _ensure_cluster_placement_group(ec2_client, name): + """Return the ARN of cluster placement group *name*, creating it if it does not exist.""" + try: + return ec2_client.describe_placement_groups(GroupNames=[name])["PlacementGroups"][0]["GroupArn"] + except Exception: + return ec2_client.create_placement_group(GroupName=name, Strategy="cluster")["PlacementGroup"]["GroupArn"] + + +def resolve_instance_with_capacity( + region, az_id, instance_type, os, minutes=50, count=2, alternative_instance_types=() +): + """Reserve capacity for *instance_type* in *az_id*, falling back to similar instance types. + + A reservation is attempted for every instance type. For instances larger than ``.xlarge`` + (more than 4 vCPUs) an existing matching reservation is reused when present, + avoiding duplicate reservations of expensive capacity; smaller types skip that check + (small reservations shouldn't dedup because multiple tests use the same instance types). + + EFA-capable instances are reserved with a placement group. + Non-EFA instances are reserved without a placement group. Returns the (possibly substituted) instance type string. """ - if instance_type not in DEFAULT_INSTANCE_TYPES: - return instance_type - - candidates = [instance_type] + get_similar_instance_types(instance_type) + alternative_instance_types = alternative_instance_types or get_similar_instance_types(instance_type) + candidates = [instance_type] + alternative_instance_types ec2_client = boto3.client("ec2", region_name=region) + instance_info = get_instance_info(instance_type, region) + vcpus = instance_info.get("VCpuInfo", {}).get("DefaultVCpus", 0) + with_placement_group = instance_info.get("NetworkInfo", {}).get("EfaSupported", False) + dedup = vcpus > 4 instance_platform = "Red Hat Enterprise Linux" if "rhel" in os else "Linux/UNIX" end_date = datetime.now(timezone.utc) + timedelta(minutes=minutes) for candidate in candidates: + # For instances larger than .xlarge, reuse an existing reservation (dedup) before creating one. + if dedup and _find_existing_reservation(ec2_client, candidate, az_id, instance_platform, with_placement_group): + return candidate try: - response = ec2_client.create_capacity_reservation( - InstanceType=candidate, - InstancePlatform=instance_platform, - AvailabilityZoneId=az_id, - InstanceCount=count, - EndDateType="limited", - EndDate=end_date, - Tenancy="default", - ) - cr_id = response["CapacityReservation"]["CapacityReservationId"] + reservation_args = { + "InstanceType": candidate, + "InstancePlatform": instance_platform, + "AvailabilityZoneId": az_id, + "InstanceCount": count, + "EndDateType": "limited", + "EndDate": end_date, + "Tenancy": "default", + } + if with_placement_group: + placement_group_name = f"{candidate}_placement_group_{az_id}" + reservation_args["PlacementGroupArn"] = _ensure_cluster_placement_group( + ec2_client, placement_group_name + ) + cr_id = ec2_client.create_capacity_reservation(**reservation_args)["CapacityReservation"][ + "CapacityReservationId" + ] logging.info( - "Capacity reservation %s created for %s (count=%d) in %s (expires in %d min)", + "Capacity reservation %s created for %s (count=%d) in %s " "(placement_group=%s, expires in %d min)", cr_id, candidate, count, az_id, + with_placement_group, minutes, ) return candidate diff --git a/tests/integration-tests/tests/efa/test_efa.py b/tests/integration-tests/tests/efa/test_efa.py index 21d699e85b..5fe9bd4777 100644 --- a/tests/integration-tests/tests/efa/test_efa.py +++ b/tests/integration-tests/tests/efa/test_efa.py @@ -17,7 +17,7 @@ import xmltodict from assertpy import assert_that, soft_assertions from remote_command_executor import RemoteCommandExecutor -from utils import get_compute_nodes_instance_ids +from utils import get_compute_nodes_instance_ids, get_instance_info from tests.common.assertions import assert_no_errors_in_logs from tests.common.mpi_common import _test_mpi @@ -88,6 +88,7 @@ def _try_reserve_head_node_instance(region, az_id, architecture, os): return None +@pytest.mark.usefixtures("flags") def test_efa( os, region, @@ -107,7 +108,7 @@ def test_efa( Grouped all tests in a single function so that cluster can be reused for all of them. """ head_node_instance = instance - if instance.startswith("p") or instance.startswith("hpc"): + if len(get_instance_info(instance, region)["NetworkInfo"]["NetworkCards"]) > 1: az_id = vpc_stack.az_override or vpc_stack.default_az_id head_node_instance = _try_reserve_head_node_instance(region, az_id, architecture, os) max_queue_size = 2