Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
{"name": "yolo26x", "version": "v26"},
{"name": "yolo26n-seg", "version": "v26"},
{"name": "yolo26n-pose", "version": "v26"},
{"name": "yolo26n", "version": "v26_nms", "cli_version": "yolov26_nms"},
{"name": "yolov8n-cls", "version": "v8"},
{"name": "yolov8n-seg", "version": "v8"},
{"name": "yolov8n-pose", "version": "v8"},
Expand Down
30 changes: 30 additions & 0 deletions tests/helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,33 @@ def nn_archive_checker(extra_keys_to_check: list = []): # noqa: B006
assert temp_cfg[keys[-1]] == target, (
f"Value `{temp_cfg[keys[-1]]}` at key `{keys}` doesn't match expected value `{target}`"
)


def load_latest_nn_archive_config() -> dict:
"""Load config.json from the most recently exported NNArchive."""
output_dir = "shared_with_container/outputs"
subdirs = [
d for d in os.listdir(output_dir) if os.path.isdir(os.path.join(output_dir, d))
]
assert subdirs, f"No folders found in `{output_dir}`"

subdirs.sort(key=lambda d: os.path.getmtime(os.path.join(output_dir, d)))
latest_subdir = subdirs[-1]
model_output_path = os.path.join(output_dir, latest_subdir)

archive_files = [f for f in os.listdir(model_output_path) if f.endswith(".tar.xz")]
assert len(archive_files) == 1, (
f"Expected 1 .tar.xz file, found {len(archive_files)}: {archive_files}"
)
archive_path = os.path.join(model_output_path, archive_files[0])

with tarfile.open(archive_path, "r:xz") as tar:
file_names = [m.name for m in tar.getmembers() if m.isfile()]
config_files = [name for name in file_names if name.endswith("config.json")]
assert len(config_files) == 1, (
f"Expected 1 config.json file, found {len(config_files)}: {config_files}"
)
config_member = tar.getmember(config_files[0])
config_file = tar.extractfile(config_member)
assert config_file is not None, "Failed to extract config.json"
return json.load(config_file)
101 changes: 101 additions & 0 deletions tests/nnarchive_output_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import annotations

from copy import deepcopy

V8_DETECTION_CHECK = {
"name": "yolov8n",
"version": "v8",
"model_outputs": ["output1_yolov6r2", "output2_yolov6r2", "output3_yolov6r2"],
"head_outputs": ["output1_yolov6r2", "output2_yolov6r2", "output3_yolov6r2"],
"yolo_outputs": ["output1_yolov6r2", "output2_yolov6r2", "output3_yolov6r2"],
}

V8_SEG_CHECK = {
"name": "yolov8n-seg",
"version": "v8",
"model_outputs": [
"output1_yolov8",
"output2_yolov8",
"output3_yolov8",
"output1_masks",
"output2_masks",
"output3_masks",
"protos_output",
],
"head_outputs": [
"output1_yolov8",
"output2_yolov8",
"output3_yolov8",
"output1_masks",
"output2_masks",
"output3_masks",
"protos_output",
],
"yolo_outputs": ["output1_yolov8", "output2_yolov8", "output3_yolov8"],
"mask_outputs": ["output1_masks", "output2_masks", "output3_masks"],
}

V8_POSE_CHECK = {
"name": "yolov8n-pose",
"version": "v8",
"model_outputs": [
"output1_yolov8",
"output2_yolov8",
"output3_yolov8",
"kpt_output1",
"kpt_output2",
"kpt_output3",
],
"head_outputs": [
"output1_yolov8",
"output2_yolov8",
"output3_yolov8",
"kpt_output1",
"kpt_output2",
"kpt_output3",
],
"yolo_outputs": ["output1_yolov8", "output2_yolov8", "output3_yolov8"],
"keypoints_outputs": ["kpt_output1", "kpt_output2", "kpt_output3"],
}


def _clone_check(base_case: dict, *, name: str, version: str) -> dict:
case = deepcopy(base_case)
case["name"] = name
case["version"] = version
return case


N_VARIANT_OUTPUT_NAME_CHECKS = [
V8_DETECTION_CHECK,
V8_SEG_CHECK,
V8_POSE_CHECK,
_clone_check(V8_DETECTION_CHECK, name="yolov9t", version="v9"),
_clone_check(V8_DETECTION_CHECK, name="yolov11n", version="v11"),
_clone_check(V8_SEG_CHECK, name="yolov11n-seg", version="v11"),
_clone_check(V8_POSE_CHECK, name="yolov11n-pose", version="v11"),
_clone_check(V8_DETECTION_CHECK, name="yolov12n", version="v12"),
{
"name": "yolo26n",
"version": "v26",
"model_outputs": ["output_yolo26"],
"head_outputs": ["output_yolo26"],
"yolo_outputs": ["output_yolo26"],
},
{
"name": "yolo26n-seg",
"version": "v26",
"model_outputs": ["output_yolo26", "output_masks", "protos_output"],
"head_outputs": ["output_yolo26", "output_masks", "protos_output"],
"yolo_outputs": ["output_yolo26"],
"mask_outputs": ["output_masks"],
},
{
"name": "yolo26n-pose",
"version": "v26",
"model_outputs": ["output_yolo26", "kpt_output"],
"head_outputs": ["output_yolo26", "kpt_output"],
"yolo_outputs": ["output_yolo26"],
"keypoints_outputs": ["kpt_output"],
},
]
76 changes: 74 additions & 2 deletions tests/test_end2end.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@

import pytest
from constants import PRIVATE_TEST_MODELS, SAVE_FOLDER, TEST_MODELS
from helper_functions import download_model, download_private_model, nn_archive_checker
from helper_functions import (
download_model,
download_private_model,
load_latest_nn_archive_config,
nn_archive_checker,
)
from nnarchive_output_checks import N_VARIANT_OUTPUT_NAME_CHECKS

logger = logging.getLogger()
logger.setLevel(logging.INFO)
Expand All @@ -15,7 +21,12 @@
@pytest.mark.parametrize(
"model",
TEST_MODELS,
ids=[model["name"] for model in TEST_MODELS],
ids=[
model.get("cli_version", model["name"])
if model.get("cli_version")
else model["name"]
for model in TEST_MODELS
],
)
def test_cli_conversion(model: dict, test_config: dict, subtests):
"""Tests the whole CLI conversion flow with no extra params specified."""
Expand Down Expand Up @@ -50,6 +61,8 @@ def test_cli_conversion(model: dict, test_config: dict, subtests):
pytest.skip("Weights not present and `download_weights` not set")

command = ["tools", model_path]
if model.get("cli_version"):
command += ["--version", model.get("cli_version")]
if model.get("size"): # edge case when stride=64 is needed
command += ["--imgsz", model.get("size")]

Expand Down Expand Up @@ -79,6 +92,65 @@ def test_cli_conversion(model: dict, test_config: dict, subtests):
nn_archive_checker(extra_keys_to_check=extra_keys_to_check)


@pytest.mark.parametrize(
"model_case",
N_VARIANT_OUTPUT_NAME_CHECKS,
ids=[model_case["name"] for model_case in N_VARIANT_OUTPUT_NAME_CHECKS],
)
def test_n_variant_nnarchive_outputs(model_case: dict, test_config: dict):
"""Checks NNArchive output-related fields for selected variants."""
if (
test_config["test_case"] is not None
and model_case["name"] != test_config["test_case"]
):
pytest.skip(
f"Test case ({model_case['name']}) doesn't match selected test case ({test_config['test_case']})"
)

if (
test_config["yolo_version"] is not None
and model_case["version"] != test_config["yolo_version"]
):
pytest.skip(
f"Model version ({model_case['version']}) doesn't match selected version ({test_config['yolo_version']})."
)

model_path = os.path.join(SAVE_FOLDER, f"{model_case['name']}.pt")
if not os.path.exists(model_path):
if test_config["download_weights"]:
model_path = download_model(model_case["name"], SAVE_FOLDER)
else:
pytest.skip("Weights missing and `download_weights` not set")

command = ["tools", model_path]
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
if result.returncode != 0:
pytest.fail(f"Exit code: {result.returncode}, Output: {result.stdout}")

cfg = load_latest_nn_archive_config()
output_names = [output["name"] for output in cfg["model"]["outputs"]]
head = cfg["model"]["heads"][0]
metadata = head["metadata"]
head_output_names = head["outputs"]
yolo_output_names = metadata["yolo_outputs"] or []
mask_output_names = metadata["mask_outputs"] or []
keypoint_output_names = metadata["keypoints_outputs"] or []

for key, actual in [
("model_outputs", output_names),
("head_outputs", head_output_names),
("yolo_outputs", yolo_output_names),
("mask_outputs", mask_output_names),
("keypoints_outputs", keypoint_output_names),
]:
for expected_name in model_case.get(key, []):
assert expected_name in actual, (
f"{key}: expected `{expected_name}` for {model_case['name']}, got {actual}"
)


@pytest.mark.parametrize(
"model",
PRIVATE_TEST_MODELS,
Expand Down
3 changes: 3 additions & 0 deletions tools/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
YOLOV11_CONVERSION,
YOLOV12_CONVERSION,
YOLOV26_CONVERSION,
YOLOV26_NMS_CONVERSION,
detect_version,
)

Expand All @@ -50,6 +51,7 @@
YOLOV11_CONVERSION,
YOLOV12_CONVERSION,
YOLOV26_CONVERSION,
YOLOV26_NMS_CONVERSION,
]


Expand Down Expand Up @@ -176,6 +178,7 @@ def convert(
YOLOV9_CONVERSION,
YOLOV11_CONVERSION,
YOLOV12_CONVERSION,
YOLOV26_NMS_CONVERSION,
]:
from tools.yolo.yolov8_exporter import YoloV8Exporter

Expand Down
43 changes: 42 additions & 1 deletion tools/modules/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import os
from datetime import datetime
from typing import List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

import onnx
import onnxsim
Expand Down Expand Up @@ -101,6 +101,44 @@ def export_onnx(self) -> os.PathLike:

return self.f_onnx

@staticmethod
def _infer_layout_from_shape(shape: List[Any]) -> Optional[str]:
rank = len(shape)
if rank == 4:
return "NCHW"
if rank == 3:
return "NCD"
if rank == 2:
return "NC"
if rank == 1:
return "C"
return None

def get_output_specs(self) -> Dict[str, Dict[str, Any]]:
"""Collect output shape and layout for all ONNX outputs by name."""
if self.f_onnx is None:
raise RuntimeError("ONNX must be exported before reading output specs.")

model_onnx = onnx.load(self.f_onnx)
specs: Dict[str, Dict[str, Any]] = {}

for output in model_onnx.graph.output:
shape: List[Any] = []
for dim in output.type.tensor_type.shape.dim:
if dim.HasField("dim_value"):
shape.append(int(dim.dim_value))
elif dim.HasField("dim_param") and dim.dim_param:
shape.append(dim.dim_param)
else:
shape.append(None)

specs[output.name] = {
"shape": shape,
"layout": self._infer_layout_from_shape(shape),
}

return specs

def make_nn_archive(
self,
class_list: List[str],
Expand Down Expand Up @@ -144,6 +182,7 @@ def make_nn_archive(

if output_kwargs is None:
output_kwargs = {}
output_specs = self.get_output_specs()

archive = ArchiveGenerator(
archive_name=self.model_name,
Expand Down Expand Up @@ -172,6 +211,8 @@ def make_nn_archive(
{
"name": output,
"dtype": DataType.FLOAT32,
"shape": output_specs.get(output, {}).get("shape"),
"layout": output_specs.get(output, {}).get("layout"),
}
for output in self.all_output_names
],
Expand Down
Loading
Loading