Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/integration-tests/configs/isolated_regions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ test-suites:
instances: ["c5n.9xlarge"]
oss: {{ OSS }}
schedulers: {{ SCHEDULERS }}
flags: ["any-efa-instances"]
# This test cannot be executed in US isolated regions
# because its logic relies on downloading an external
# dependency from GitHub, which is not possible
Expand Down
21 changes: 14 additions & 7 deletions tests/integration-tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
)
from xdist import get_xdist_worker_id

from tests.common.capacity_helpers import resolve_instance_with_capacity
from tests.common.capacity_helpers import get_efa_instance_types, resolve_instance_with_capacity
from tests.common.osu_common import PRIVATE_OSES, run_osu_benchmarks
from tests.common.schedulers_common import get_scheduler_commands
from tests.common.storage.constants import StorageType
Expand Down Expand Up @@ -688,8 +688,9 @@ def inject_internal_storage_settings(kwargs):


def inject_placement_group_settings(vpc_stack, instance, region, kwargs):
if vpc_stack.az_override:
placement_group_name = f"{instance}_placement_group_{vpc_stack.az_override}"
az = vpc_stack.az_override or vpc_stack.default_az_id
if az:
placement_group_name = f"{instance}_placement_group_{az}"
try:
ec2_client = boto3.client("ec2", region_name=region)
ec2_client.describe_placement_groups(GroupNames=[placement_group_name])
Expand Down Expand Up @@ -1303,10 +1304,10 @@ def serial_execution_by_instance(request, instance, region, os_platform):

@pytest.fixture(autouse=True)
def resolve_default_instance(request):
"""Resolve default instance types (c5.xlarge / m6g.xlarge) to an alternative with available capacity.
"""Reserve capacity for the test instance, substituting a same-spec alternative on ICE.

Uses create_capacity_reservation as a probe — same pattern as _try_reserve_head_node_instance
in test_efa.py. Only activates for the known default instance types; all others pass through.
Dedups against existing reservations for instances larger than ``.xlarge`` and reserves
EFA-capable instances with a placement group.
When a substitute is found, ``request.node.funcargs["instance"]`` is updated so that downstream
fixtures (architecture, pcluster_config_reader, etc.) and the test itself see the resolved value.

Expand All @@ -1320,9 +1321,15 @@ def resolve_default_instance(request):
region = request.getfixturevalue("region")
os_name = request.getfixturevalue("os")
vpc_stack = request.getfixturevalue("vpc_stack")
flags = request.getfixturevalue("flags")
alternative_instance_types = []
if flags and "any-efa-instances" in flags:
alternative_instance_types = get_efa_instance_types(region)

az_id = vpc_stack.az_override or vpc_stack.default_az_id
resolved = resolve_instance_with_capacity(region, az_id, instance, os_name)
resolved = resolve_instance_with_capacity(
region, az_id, instance, os_name, alternative_instance_types=alternative_instance_types
)
if resolved != instance:
logging.info("Substituted default instance %s -> %s (capacity fallback)", instance, resolved)
request.node.funcargs["instance"] = resolved
Expand Down
126 changes: 100 additions & 26 deletions tests/integration-tests/tests/common/capacity_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,52 +13,126 @@
from datetime import datetime, timedelta, timezone

import boto3
from utils import get_similar_instance_types
from utils import get_instance_info, get_similar_instance_types

# Default instance types that are subject to capacity fallback.
DEFAULT_INSTANCE_TYPES = {"c5.xlarge", "m6g.xlarge", "m6i.xlarge"}

def _find_existing_reservation(ec2_client, instance_type, az_id, instance_platform, with_placement_group):
"""Return True if an active capacity reservation already exists."""
try:
reservations = ec2_client.describe_capacity_reservations(
Filters=[
{"Name": "instance-type", "Values": [instance_type]},
{"Name": "state", "Values": ["active"]},
]
)["CapacityReservations"]
except Exception as e:
logging.warning("Could not list existing capacity reservations for %s: %s", instance_type, e)
return False

def resolve_instance_with_capacity(region, az_id, instance_type, os, minutes=50, count=2):
"""Try to reserve capacity for *instance_type* in *az_id*, falling back to alternatives on ICE.
for reservation in reservations:
if (
reservation.get("AvailabilityZoneId") == az_id
and reservation.get("InstancePlatform") == instance_platform
and bool(reservation.get("PlacementGroupArn")) == with_placement_group
):
logging.info(
"Reusing existing capacity reservation %s for %s in %s (placement_group=%s)",
reservation.get("CapacityReservationId"),
instance_type,
az_id,
with_placement_group,
)
return True
return False


def get_efa_instance_types(region):
"""Return EFA-capable instance types."""
ec2_client = boto3.client("ec2", region_name=region)
paginator = ec2_client.get_paginator("describe_instance_types")

efa_instances = []
for page in paginator.paginate(
Filters=[
{"Name": "network-info.efa-supported", "Values": ["true"]},
{"Name": "supported-usage-class", "Values": ["on-demand", "spot"]},
],
):
for instance in page["InstanceTypes"]:
vcpus = instance.get("VCpuInfo", {}).get("DefaultVCpus", 0)
efa_instances.append((vcpus, instance["InstanceType"]))

# Primary sort by vCPU count (cost), secondary sort by name (determinism).
efa_instances.sort(key=lambda item: (item[0], item[1]))

Only activates when *instance_type* is one of the known defaults (c5.xlarge / m6g.xlarge).
For any other instance type the value is returned unchanged.
instance_types = [instance_type for _, instance_type in efa_instances]
logging.info("Retrieved %d EFA-capable instance types in %s: %s", len(instance_types), region, instance_types)
return instance_types

The probe uses ``create_capacity_reservation`` — the same proven pattern used by
``_try_reserve_head_node_instance`` in test_efa.py. A successful reservation
guarantees capacity for the subsequent cluster launch and auto-expires after
*minutes* minutes.

def _ensure_cluster_placement_group(ec2_client, name):
"""Return the ARN of cluster placement group *name*, creating it if it does not exist."""
try:
return ec2_client.describe_placement_groups(GroupNames=[name])["PlacementGroups"][0]["GroupArn"]
except Exception:
return ec2_client.create_placement_group(GroupName=name, Strategy="cluster")["PlacementGroup"]["GroupArn"]


def resolve_instance_with_capacity(
region, az_id, instance_type, os, minutes=50, count=2, alternative_instance_types=()
):
"""Reserve capacity for *instance_type* in *az_id*, falling back to similar instance types.

A reservation is attempted for every instance type. For instances larger than ``.xlarge``
(more than 4 vCPUs) an existing matching reservation is reused when present,
avoiding duplicate reservations of expensive capacity; smaller types skip that check
(small reservations shouldn't dedup because multiple tests use the same instance types).

EFA-capable instances are reserved with a placement group.
Non-EFA instances are reserved without a placement group.

Returns the (possibly substituted) instance type string.
"""
if instance_type not in DEFAULT_INSTANCE_TYPES:
return instance_type

candidates = [instance_type] + get_similar_instance_types(instance_type)
alternative_instance_types = alternative_instance_types or get_similar_instance_types(instance_type)
candidates = [instance_type] + alternative_instance_types

ec2_client = boto3.client("ec2", region_name=region)
instance_info = get_instance_info(instance_type, region)
vcpus = instance_info.get("VCpuInfo", {}).get("DefaultVCpus", 0)
with_placement_group = instance_info.get("NetworkInfo", {}).get("EfaSupported", False)
dedup = vcpus > 4
instance_platform = "Red Hat Enterprise Linux" if "rhel" in os else "Linux/UNIX"
end_date = datetime.now(timezone.utc) + timedelta(minutes=minutes)

for candidate in candidates:
# For instances larger than .xlarge, reuse an existing reservation (dedup) before creating one.
if dedup and _find_existing_reservation(ec2_client, candidate, az_id, instance_platform, with_placement_group):
return candidate
try:
response = ec2_client.create_capacity_reservation(
InstanceType=candidate,
InstancePlatform=instance_platform,
AvailabilityZoneId=az_id,
InstanceCount=count,
EndDateType="limited",
EndDate=end_date,
Tenancy="default",
)
cr_id = response["CapacityReservation"]["CapacityReservationId"]
reservation_args = {
"InstanceType": candidate,
"InstancePlatform": instance_platform,
"AvailabilityZoneId": az_id,
"InstanceCount": count,
"EndDateType": "limited",
"EndDate": end_date,
"Tenancy": "default",
}
if with_placement_group:
placement_group_name = f"{candidate}_placement_group_{az_id}"
reservation_args["PlacementGroupArn"] = _ensure_cluster_placement_group(
ec2_client, placement_group_name
)
cr_id = ec2_client.create_capacity_reservation(**reservation_args)["CapacityReservation"][
"CapacityReservationId"
]
logging.info(
"Capacity reservation %s created for %s (count=%d) in %s (expires in %d min)",
"Capacity reservation %s created for %s (count=%d) in %s " "(placement_group=%s, expires in %d min)",
cr_id,
candidate,
count,
az_id,
with_placement_group,
minutes,
)
return candidate
Expand Down
5 changes: 3 additions & 2 deletions tests/integration-tests/tests/efa/test_efa.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import xmltodict
from assertpy import assert_that, soft_assertions
from remote_command_executor import RemoteCommandExecutor
from utils import get_compute_nodes_instance_ids
from utils import get_compute_nodes_instance_ids, get_instance_info

from tests.common.assertions import assert_no_errors_in_logs
from tests.common.mpi_common import _test_mpi
Expand Down Expand Up @@ -88,6 +88,7 @@ def _try_reserve_head_node_instance(region, az_id, architecture, os):
return None


@pytest.mark.usefixtures("flags")
def test_efa(
os,
region,
Expand All @@ -107,7 +108,7 @@ def test_efa(
Grouped all tests in a single function so that cluster can be reused for all of them.
"""
head_node_instance = instance
if instance.startswith("p") or instance.startswith("hpc"):
if len(get_instance_info(instance, region)["NetworkInfo"]["NetworkCards"]) > 1:
az_id = vpc_stack.az_override or vpc_stack.default_az_id
head_node_instance = _try_reserve_head_node_instance(region, az_id, architecture, os)
max_queue_size = 2
Expand Down
Loading