From b97ec29a983e35e684a6ddf4688f7c6a68b603b0 Mon Sep 17 00:00:00 2001 From: Brian Tiemann Date: Tue, 7 Apr 2026 08:24:59 -0400 Subject: [PATCH 1/9] CustomObjectType schema exporter --- netbox_custom_objects/exporter.py | 215 +++++++++ .../migrations/0006_backfill_schema_ids.py | 63 +++ netbox_custom_objects/tests/test_exporter.py | 433 ++++++++++++++++++ .../tests/test_schema_format.py | 190 ++++++++ 4 files changed, 901 insertions(+) create mode 100644 netbox_custom_objects/exporter.py create mode 100644 netbox_custom_objects/migrations/0006_backfill_schema_ids.py create mode 100644 netbox_custom_objects/tests/test_exporter.py diff --git a/netbox_custom_objects/exporter.py b/netbox_custom_objects/exporter.py new file mode 100644 index 0000000..edafc35 --- /dev/null +++ b/netbox_custom_objects/exporter.py @@ -0,0 +1,215 @@ +""" +Exporter for the COT portable schema format (issue #388). + +Converts live CustomObjectType DB state into a schema document dict that +conforms to cot_schema_v1.json. The returned dict can be serialised to YAML +or JSON by the caller. + +Public API +---------- + export_cot(cot) → dict # single COT definition (no top-level wrapper) + export_cots(cots) → dict # full schema document { schema_version, types } + +Notes +----- +- Fields without a schema_id (created before the schema-format feature) are + skipped with a WARNING log entry. They cannot be tracked across installs. +- Attribute values that equal FIELD_DEFAULTS are omitted to keep the output + minimal (round-trip safe: the importer re-applies the same defaults). +- Tombstones (removed_fields) are read from the COT's schema_document. Until + the apply endpoint (#390) is implemented this will always be empty; once + apply is wired up, deletions will be persisted there automatically. +""" + +import logging +import re + +from netbox_custom_objects import constants +from netbox_custom_objects.schema_format import ( + CHOICES_TO_SCHEMA_TYPE, + CUSTOM_OBJECTS_APP_LABEL_SLUG, + FIELD_DEFAULTS, + FIELD_TYPE_ATTRS, + SCHEMA_FORMAT_VERSION, +) + +logger = logging.getLogger(__name__) + +# Matches the generated model name produced by CustomObjectType.get_table_model_name(). +# Capturing group 1 is the numeric COT id. +_TABLE_MODEL_RE = re.compile(r'^table(\d+)model$', re.IGNORECASE) + +# Ordered list of field_base attributes to check for non-default values. +# Type-specific attributes (validation_*, choice_set, related_*) are handled +# separately via FIELD_TYPE_ATTRS. +_BASE_ATTRS = ( + "label", + "description", + "group_name", + "primary", + "required", + "unique", + "default", + "weight", + "search_weight", + "filter_logic", + "ui_visible", + "ui_editable", + "is_cloneable", + "deprecated", + "deprecated_since", + "scheduled_removal", +) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _encode_related_object_type(rot) -> str: + """ + Encode an ObjectType FK as a schema ``related_object_type`` string. + + Built-in NetBox objects → ``"/"`` (e.g. ``"dcim/device"``) + Custom Object Types → ``"custom-objects/"`` + """ + if rot.app_label == constants.APP_LABEL: + m = _TABLE_MODEL_RE.match(rot.model) + if m: + # Avoid a circular import — import here so the module can be loaded + # independently of the full Django app stack in unit tests. + from netbox_custom_objects.models import CustomObjectType # noqa: PLC0415 + cot_id = int(m.group(1)) + slug = CustomObjectType.objects.values_list('slug', flat=True).get(pk=cot_id) + return f"{CUSTOM_OBJECTS_APP_LABEL_SLUG}/{slug}" + return f"{rot.app_label}/{rot.model}" + + +def _export_field(field) -> dict: + """ + Serialise a single ``CustomObjectTypeField`` instance to a schema field dict. + + Raises ``ValueError`` if ``field.schema_id`` is ``None``; callers should + pre-filter or handle this case before calling this function. + """ + if field.schema_id is None: + raise ValueError( + f"Field {field.name!r} on COT {field.custom_object_type_id!r} " + "has no schema_id and cannot be exported." + ) + + schema_type = CHOICES_TO_SCHEMA_TYPE[field.type] + + result = { + "id": field.schema_id, + "name": field.name, + "type": schema_type, + } + + # ── Base attributes (omit when equal to documented defaults) ──────────── + for attr in _BASE_ATTRS: + value = getattr(field, attr) + if value != FIELD_DEFAULTS.get(attr): + result[attr] = value + + # ── Type-specific attributes ───────────────────────────────────────────── + for attr in sorted(FIELD_TYPE_ATTRS[schema_type]): + if attr == "choice_set": + # Required for select/multiselect; always present. + result["choice_set"] = field.choice_set.name + elif attr == "related_object_type": + # Required for object/multiobject; always present. + result["related_object_type"] = _encode_related_object_type( + field.related_object_type + ) + elif attr == "related_object_filter": + value = field.related_object_filter + if value != FIELD_DEFAULTS.get("related_object_filter"): + result["related_object_filter"] = value + elif attr in ("validation_regex", "validation_minimum", "validation_maximum"): + value = getattr(field, attr) + if value != FIELD_DEFAULTS.get(attr): + result[attr] = value + + return result + + +def _removed_fields_from_document(cot) -> list: + """ + Extract the ``removed_fields`` tombstone list for *cot* from its stored + ``schema_document``. Returns an empty list if the document is absent or + does not reference this COT. + """ + if not cot.schema_document: + return [] + for type_def in cot.schema_document.get("types", []): + if type_def.get("name") == cot.name: + return list(type_def.get("removed_fields", [])) + return [] + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def export_cot(cot) -> dict: + """ + Serialise a single ``CustomObjectType`` to its schema definition dict + (the inner object that goes inside the ``types`` list). + + Fields without a ``schema_id`` are skipped; a WARNING is logged for each. + """ + result: dict = { + "name": cot.name, + "slug": cot.slug, + } + + # Optional COT-level attributes — omit when blank/unset. + if cot.version: + result["version"] = cot.version + if cot.verbose_name: + result["verbose_name"] = cot.verbose_name + if cot.verbose_name_plural: + result["verbose_name_plural"] = cot.verbose_name_plural + if cot.description: + result["description"] = cot.description + if cot.group_name: + result["group_name"] = cot.group_name + + # Active + deprecated fields, ordered by schema_id for stable output. + exported_fields = [] + for field in cot.fields.order_by("schema_id"): + if field.schema_id is None: + logger.warning( + "Skipping field %r on COT %r during export: no schema_id assigned. " + "This field was likely created before the schema-format feature was " + "introduced and cannot be tracked portably.", + field.name, + cot.name, + ) + continue + exported_fields.append(_export_field(field)) + + if exported_fields: + result["fields"] = exported_fields + + # Tombstones from previous apply operations. + removed = _removed_fields_from_document(cot) + if removed: + result["removed_fields"] = removed + + return result + + +def export_cots(cots) -> dict: + """ + Serialise one or more ``CustomObjectType`` instances to a complete schema + document dict (``{ schema_version, types }``) that validates against + ``cot_schema_v1.json``. + + *cots* may be any iterable of ``CustomObjectType`` instances. + """ + return { + "schema_version": SCHEMA_FORMAT_VERSION, + "types": [export_cot(cot) for cot in cots], + } \ No newline at end of file diff --git a/netbox_custom_objects/migrations/0006_backfill_schema_ids.py b/netbox_custom_objects/migrations/0006_backfill_schema_ids.py new file mode 100644 index 0000000..e88d9bc --- /dev/null +++ b/netbox_custom_objects/migrations/0006_backfill_schema_ids.py @@ -0,0 +1,63 @@ +""" +Data migration: assign schema_id to existing CustomObjectTypeField rows that +predate the schema-format feature and never received one. + +Strategy +-------- +For each CustomObjectType: + 1. Find the current maximum schema_id already in use (may be 0 if none). + 2. Assign the next available integer to every field with schema_id=NULL, + ordered by the field's primary-key (creation order) for determinism. + 3. Update next_schema_id on the parent CustomObjectType to the highest ID + now assigned, so that future field additions continue from the right value. + +The reverse operation is intentionally a no-op: rolling back would leave the +schema_id column in an indeterminate state, and re-running the forward +migration is safe (it only touches NULL rows). +""" + +from django.db import migrations +from django.db.models import Max + + +# Exposed as a module-level name so tests can import and call it directly +# without going through the migration runner. +def assign_schema_ids(apps, schema_editor): + CustomObjectType = apps.get_model('netbox_custom_objects', 'CustomObjectType') + CustomObjectTypeField = apps.get_model('netbox_custom_objects', 'CustomObjectTypeField') + + for cot in CustomObjectType.objects.all(): + # Highest schema_id already in use for this COT (0 if none). + current_max = ( + CustomObjectTypeField.objects + .filter(custom_object_type=cot, schema_id__isnull=False) + .aggregate(max_id=Max('schema_id'))['max_id'] or 0 + ) + + # Assign the next integers to all unassigned fields, ordered by pk. + next_id = current_max + 1 + for field in ( + CustomObjectTypeField.objects + .filter(custom_object_type=cot, schema_id__isnull=True) + .order_by('id') + ): + CustomObjectTypeField.objects.filter(pk=field.pk).update(schema_id=next_id) + next_id += 1 + + # Sync next_schema_id upward. Never decrease it. + highest_assigned = next_id - 1 # equals current_max when no NULLs existed + if highest_assigned > cot.next_schema_id: + CustomObjectType.objects.filter(pk=cot.pk).update( + next_schema_id=highest_assigned + ) + + +class Migration(migrations.Migration): + + dependencies = [ + ('netbox_custom_objects', '0005_customobjecttype_next_schema_id_and_more'), + ] + + operations = [ + migrations.RunPython(assign_schema_ids, migrations.RunPython.noop), + ] \ No newline at end of file diff --git a/netbox_custom_objects/tests/test_exporter.py b/netbox_custom_objects/tests/test_exporter.py new file mode 100644 index 0000000..7fad716 --- /dev/null +++ b/netbox_custom_objects/tests/test_exporter.py @@ -0,0 +1,433 @@ +""" +Tests for the COT schema exporter (issue #388). + +Covers: +- Minimal and full COT serialisation +- Default-value elision +- Encoding of built-in and custom related_object_type values +- choice_set serialisation +- Deprecated fields included in 'fields' list +- Fields without schema_id are skipped (warning emitted) +- Tombstones read from schema_document +- Multi-type document structure +- Output validates against cot_schema_v1.json +""" +import unittest +from pathlib import Path + +from django.test import TestCase, TransactionTestCase + +from netbox_custom_objects.exporter import export_cot, export_cots +from netbox_custom_objects.models import CustomObjectType, CustomObjectTypeField +from netbox_custom_objects.schema_format import SCHEMA_FORMAT_VERSION + +from .base import CustomObjectsTestCase, TransactionCleanupMixin + +try: + import jsonschema + HAS_JSONSCHEMA = True +except ImportError: + HAS_JSONSCHEMA = False + +_SCHEMA_PATH = ( + Path(__file__).resolve().parent.parent / "schemas" / "cot_schema_v1.json" +) + + +# =========================================================================== +# Helpers +# =========================================================================== + +def _field_by_name(exported_cot: dict, name: str) -> dict: + """Return the exported field dict with the given name, or raise.""" + for f in exported_cot.get("fields", []): + if f["name"] == name: + return f + raise KeyError(f"No exported field named {name!r}") + + +# =========================================================================== +# Tests +# =========================================================================== + +class ExporterBasicTestCase(CustomObjectsTestCase, TestCase): + """Basic structure and minimal-output tests.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type( + name='widget', + slug='widget', + description='A widget', + verbose_name='Widget', + verbose_name_plural='Widgets', + version='1.0.0', + group_name='inventory', + ) + cls.field = cls.create_custom_object_type_field( + cls.cot, + name='label', + type='text', + label='', # will be omitted (default) + required=False, # will be omitted (default) + ) + + def test_top_level_structure(self): + doc = export_cots([self.cot]) + self.assertEqual(doc["schema_version"], SCHEMA_FORMAT_VERSION) + self.assertIsInstance(doc["types"], list) + self.assertEqual(len(doc["types"]), 1) + + def test_cot_required_keys(self): + cot_def = export_cot(self.cot) + self.assertEqual(cot_def["name"], "widget") + self.assertEqual(cot_def["slug"], "widget") + + def test_optional_cot_attrs_included_when_set(self): + cot_def = export_cot(self.cot) + self.assertEqual(cot_def["description"], "A widget") + self.assertEqual(cot_def["verbose_name"], "Widget") + self.assertEqual(cot_def["verbose_name_plural"], "Widgets") + self.assertEqual(cot_def["version"], "1.0.0") + self.assertEqual(cot_def["group_name"], "inventory") + + def test_optional_cot_attrs_omitted_when_blank(self): + bare = self.create_custom_object_type( + name='bare', slug='bare', + description='', verbose_name='', verbose_name_plural='', + version='', group_name='', + ) + cot_def = export_cot(bare) + for key in ("description", "verbose_name", "verbose_name_plural", + "version", "group_name"): + self.assertNotIn(key, cot_def) + + def test_field_required_keys_present(self): + f = _field_by_name(export_cot(self.cot), "label") + self.assertIn("id", f) + self.assertEqual(f["name"], "label") + self.assertEqual(f["type"], "text") + + def test_field_schema_id_matches_model(self): + f = _field_by_name(export_cot(self.cot), "label") + self.assertEqual(f["id"], self.field.schema_id) + + def test_fields_ordered_by_schema_id(self): + cot = self.create_custom_object_type(name='ordered', slug='ordered') + self.create_custom_object_type_field(cot, name='first', type='text') + self.create_custom_object_type_field(cot, name='second', type='text') + self.create_custom_object_type_field(cot, name='third', type='text') + exported = export_cot(cot) + ids = [f["id"] for f in exported["fields"]] + self.assertEqual(ids, sorted(ids)) + + def test_no_fields_key_when_no_exportable_fields(self): + bare = self.create_custom_object_type(name='nofields', slug='no-fields') + cot_def = export_cot(bare) + self.assertNotIn("fields", cot_def) + + def test_multi_type_document(self): + cot2 = self.create_custom_object_type(name='gadget', slug='gadget') + doc = export_cots([self.cot, cot2]) + names = [t["name"] for t in doc["types"]] + self.assertIn("widget", names) + self.assertIn("gadget", names) + + +class ExporterDefaultElisionTestCase(CustomObjectsTestCase, TestCase): + """Default values must be omitted; non-defaults must be present.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type(name='elide', slug='elide') + cls.default_field = cls.create_custom_object_type_field( + cls.cot, name='plain', type='text', + label='', # default — omit + description='', # default — omit + required=False, # default — omit + primary=False, # default — omit + weight=100, # default — omit + search_weight=500, # default — omit + ) + cls.non_default_field = cls.create_custom_object_type_field( + cls.cot, name='special', type='text', + label='Special Label', + description='A description', + required=True, + primary=True, + weight=200, + search_weight=1000, + filter_logic='exact', + ui_visible='if-set', + ui_editable='no', + is_cloneable=True, + ) + + def test_default_attrs_omitted(self): + f = _field_by_name(export_cot(self.cot), "plain") + for key in ("label", "description", "required", "primary", + "weight", "search_weight"): + self.assertNotIn(key, f, f"{key!r} should be omitted when equal to default") + + def test_non_default_attrs_included(self): + f = _field_by_name(export_cot(self.cot), "special") + self.assertEqual(f["label"], "Special Label") + self.assertEqual(f["description"], "A description") + self.assertTrue(f["required"]) + self.assertTrue(f["primary"]) + self.assertEqual(f["weight"], 200) + self.assertEqual(f["search_weight"], 1000) + self.assertEqual(f["filter_logic"], "exact") + self.assertEqual(f["ui_visible"], "if-set") + self.assertEqual(f["ui_editable"], "no") + self.assertTrue(f["is_cloneable"]) + + def test_default_value_omitted_when_null(self): + """field.default=None (the default) must not appear in export.""" + f = _field_by_name(export_cot(self.cot), "plain") + self.assertNotIn("default", f) + + def test_non_null_default_value_included(self): + cot = self.create_custom_object_type(name='withdefault', slug='with-default') + self.create_custom_object_type_field( + cot, name='active', type='boolean', default=True + ) + f = _field_by_name(export_cot(cot), "active") + self.assertIn("default", f) + self.assertTrue(f["default"]) + + +class ExporterFieldTypesTestCase(CustomObjectsTestCase, TestCase): + """Type-specific attribute serialisation.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type(name='typed', slug='typed') + cls.choice_set = cls.create_choice_set(name='Status Choices') + cls.device_ot = cls.get_device_object_type() + + def test_text_field_validation_regex_included(self): + self.create_custom_object_type_field( + self.cot, name='code', type='text', validation_regex=r'^[A-Z]{3}$' + ) + f = _field_by_name(export_cot(self.cot), "code") + self.assertEqual(f["validation_regex"], r'^[A-Z]{3}$') + + def test_text_field_no_regex_omitted(self): + self.create_custom_object_type_field( + self.cot, name='note', type='text' + ) + f = _field_by_name(export_cot(self.cot), "note") + self.assertNotIn("validation_regex", f) + + def test_integer_field_min_max_included(self): + self.create_custom_object_type_field( + self.cot, name='count', type='integer', + validation_minimum=0, validation_maximum=999 + ) + f = _field_by_name(export_cot(self.cot), "count") + self.assertEqual(f["validation_minimum"], 0) + self.assertEqual(f["validation_maximum"], 999) + + def test_integer_field_no_min_max_omitted(self): + self.create_custom_object_type_field( + self.cot, name='qty', type='integer' + ) + f = _field_by_name(export_cot(self.cot), "qty") + self.assertNotIn("validation_minimum", f) + self.assertNotIn("validation_maximum", f) + + def test_select_field_choice_set_name(self): + self.create_custom_object_type_field( + self.cot, name='status', type='select', choice_set=self.choice_set + ) + f = _field_by_name(export_cot(self.cot), "status") + self.assertEqual(f["choice_set"], "Status Choices") + + def test_object_field_builtin_encoding(self): + self.create_custom_object_type_field( + self.cot, name='device', type='object', + related_object_type=self.device_ot + ) + f = _field_by_name(export_cot(self.cot), "device") + self.assertEqual(f["related_object_type"], "dcim/device") + + def test_object_field_custom_cot_encoding(self): + other = self.create_custom_object_type(name='rack', slug='rack') + rack_ot = other.object_type + self.create_custom_object_type_field( + self.cot, name='rack', type='object', + related_object_type=rack_ot + ) + f = _field_by_name(export_cot(self.cot), "rack") + self.assertEqual(f["related_object_type"], "custom-objects/rack") + + def test_object_field_filter_included_when_set(self): + self.create_custom_object_type_field( + self.cot, name='filtered_device', type='object', + related_object_type=self.device_ot, + related_object_filter={"site_id": [1, 2]} + ) + f = _field_by_name(export_cot(self.cot), "filtered_device") + self.assertEqual(f["related_object_filter"], {"site_id": [1, 2]}) + + def test_object_field_filter_omitted_when_null(self): + self.create_custom_object_type_field( + self.cot, name='unfiltered', type='object', + related_object_type=self.device_ot + ) + f = _field_by_name(export_cot(self.cot), "unfiltered") + self.assertNotIn("related_object_filter", f) + + def test_type_specific_attrs_not_leaked_across_types(self): + """A boolean field must not carry validation_regex or choice_set.""" + self.create_custom_object_type_field( + self.cot, name='flag', type='boolean' + ) + f = _field_by_name(export_cot(self.cot), "flag") + for spurious in ("validation_regex", "validation_minimum", + "validation_maximum", "choice_set", + "related_object_type", "related_object_filter"): + self.assertNotIn(spurious, f) + + +class ExporterDeprecationTestCase(CustomObjectsTestCase, TestCase): + """Deprecated fields are exported in 'fields', not removed_fields.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type(name='lifecycle', slug='lifecycle') + cls.create_custom_object_type_field(cls.cot, name='active', type='text') + # Reload from DB and mark deprecated + dep = CustomObjectTypeField.objects.get( + custom_object_type=cls.cot, name='active' + ) + dep.deprecated = True + dep.deprecated_since = '1.1.0' + dep.scheduled_removal = '2.0.0' + dep.save() + + def test_deprecated_field_in_fields_list(self): + cot_def = export_cot(self.cot) + names = [f["name"] for f in cot_def.get("fields", [])] + self.assertIn("active", names) + + def test_deprecated_attrs_exported(self): + f = _field_by_name(export_cot(self.cot), "active") + self.assertTrue(f["deprecated"]) + self.assertEqual(f["deprecated_since"], "1.1.0") + self.assertEqual(f["scheduled_removal"], "2.0.0") + + def test_deprecated_field_not_in_removed_fields(self): + cot_def = export_cot(self.cot) + self.assertNotIn("removed_fields", cot_def) + + +class ExporterSchemaIdTestCase(CustomObjectsTestCase, TransactionTestCase, + TransactionCleanupMixin): + """Fields without schema_id are skipped with a warning.""" + + def test_field_without_schema_id_is_skipped(self): + cot = self.create_custom_object_type(name='skiptest', slug='skip-test') + field = self.create_custom_object_type_field(cot, name='noid', type='text') + # Force schema_id to None after creation (simulating a pre-feature field) + CustomObjectTypeField.objects.filter(pk=field.pk).update(schema_id=None) + + with self.assertLogs('netbox_custom_objects.exporter', level='WARNING') as cm: + cot_def = export_cot(cot) + + self.assertNotIn("fields", cot_def) + self.assertTrue(any("noid" in line for line in cm.output)) + + +class ExporterTombstoneTestCase(CustomObjectsTestCase, TestCase): + """removed_fields are read from schema_document.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type(name='tombstone', slug='tombstone') + cls.cot.schema_document = { + "schema_version": "1", + "types": [ + { + "name": "tombstone", + "slug": "tombstone", + "fields": [], + "removed_fields": [ + {"id": 7, "name": "old_field", "type": "text", + "removed_in": "2.0.0"} + ], + } + ], + } + cls.cot.save(update_fields=["schema_document"]) + + def test_removed_fields_from_schema_document(self): + cot_def = export_cot(self.cot) + self.assertIn("removed_fields", cot_def) + self.assertEqual(len(cot_def["removed_fields"]), 1) + self.assertEqual(cot_def["removed_fields"][0]["id"], 7) + self.assertEqual(cot_def["removed_fields"][0]["name"], "old_field") + + def test_no_removed_fields_key_when_document_empty(self): + bare = self.create_custom_object_type(name='noremoved', slug='no-removed') + cot_def = export_cot(bare) + self.assertNotIn("removed_fields", cot_def) + + +@unittest.skipUnless(HAS_JSONSCHEMA, "jsonschema not installed") +class ExporterSchemaValidationTestCase(CustomObjectsTestCase, TestCase): + """Exported documents must validate against cot_schema_v1.json.""" + + _validator = None + + @classmethod + def setUpClass(cls): + super().setUpClass() + import json + raw = json.loads(_SCHEMA_PATH.read_text()) + cls._validator = jsonschema.Draft202012Validator(raw) + + def _assert_valid(self, doc): + errors = list(self._validator.iter_errors(doc)) + if errors: + self.fail( + f"Schema validation failed:\n" + + "\n".join(f" {e.json_path}: {e.message}" for e in errors) + ) + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type( + name='schvalid', slug='sch-valid', version='1.0.0' + ) + cls.choice_set = cls.create_choice_set(name='Sch Valid Choices') + cls.device_ot = cls.get_device_object_type() + + cls.create_custom_object_type_field( + cls.cot, name='label', type='text', + required=True, primary=True + ) + cls.create_custom_object_type_field( + cls.cot, name='count', type='integer', + validation_minimum=0 + ) + cls.create_custom_object_type_field( + cls.cot, name='status', type='select', + choice_set=cls.choice_set + ) + cls.create_custom_object_type_field( + cls.cot, name='device', type='object', + related_object_type=cls.device_ot + ) + + def test_single_cot_validates(self): + self._assert_valid(export_cots([self.cot])) + + def test_minimal_cot_validates(self): + bare = self.create_custom_object_type(name='barevalid', slug='bare-valid') + self._assert_valid(export_cots([bare])) + + def test_multi_type_document_validates(self): + cot2 = self.create_custom_object_type(name='second', slug='second') + self._assert_valid(export_cots([self.cot, cot2])) \ No newline at end of file diff --git a/netbox_custom_objects/tests/test_schema_format.py b/netbox_custom_objects/tests/test_schema_format.py index bfaa536..fe39a0a 100644 --- a/netbox_custom_objects/tests/test_schema_format.py +++ b/netbox_custom_objects/tests/test_schema_format.py @@ -165,6 +165,196 @@ def test_deprecation_version_strings_accept_semver(self): self.assertEqual(field.deprecated_since, '1.0.0-alpha.1+build.42') +# =========================================================================== +# Backfill migration logic tests (TransactionTestCase — uses DDL) +# =========================================================================== + +class SchemaIdBackfillTestCase( + TransactionCleanupMixin, CustomObjectsTestCase, TransactionTestCase +): + """ + Tests for the 0006_backfill_schema_ids migration logic. + + Rather than exercising the migration runner itself (which would require + replaying the full migration history), these tests call the same backfill + function directly and verify its behaviour against real model instances. + """ + + @staticmethod + def _run_backfill(): + """Execute the backfill function directly against the live DB.""" + import importlib + mod = importlib.import_module( + 'netbox_custom_objects.migrations.0006_backfill_schema_ids' + ) + # The function accepts (apps, schema_editor) but only uses apps.get_model(). + # We pass a lightweight shim that delegates to the real models. + class _AppsShim: + @staticmethod + def get_model(app_label, model_name): + from django.apps import apps + return apps.get_model(app_label, model_name) + + mod.assign_schema_ids(_AppsShim(), None) + + # ------------------------------------------------------------------ + # Core backfill behaviour + # ------------------------------------------------------------------ + + def test_fields_without_schema_id_are_assigned(self): + cot = self.create_custom_object_type(name='bf1', slug='bf-1') + f1 = self.create_custom_object_type_field(cot, name='alpha', type='text') + f2 = self.create_custom_object_type_field(cot, name='beta', type='text') + # Force both to NULL (simulate pre-feature state) + CustomObjectTypeField.objects.filter( + custom_object_type=cot + ).update(schema_id=None) + + self._run_backfill() + + f1.refresh_from_db() + f2.refresh_from_db() + self.assertIsNotNone(f1.schema_id) + self.assertIsNotNone(f2.schema_id) + + def test_assigned_ids_are_sequential_from_one(self): + cot = self.create_custom_object_type(name='bf2', slug='bf-2') + fields = [ + self.create_custom_object_type_field(cot, name=f'f{i}', type='text') + for i in range(1, 4) + ] + CustomObjectTypeField.objects.filter( + custom_object_type=cot + ).update(schema_id=None) + + self._run_backfill() + + ids = sorted( + CustomObjectTypeField.objects.filter(custom_object_type=cot) + .values_list('schema_id', flat=True) + ) + self.assertEqual(ids, [1, 2, 3]) + + def test_existing_schema_ids_are_not_changed(self): + cot = self.create_custom_object_type(name='bf3', slug='bf-3') + f_existing = self.create_custom_object_type_field(cot, name='kept', type='text') + existing_id = f_existing.schema_id # set by auto-assign + self.assertIsNotNone(existing_id) + + # Add a second field and force it to NULL + f_new = self.create_custom_object_type_field(cot, name='new_one', type='text') + CustomObjectTypeField.objects.filter(pk=f_new.pk).update(schema_id=None) + + self._run_backfill() + + f_existing.refresh_from_db() + f_new.refresh_from_db() + self.assertEqual(f_existing.schema_id, existing_id, "Pre-existing ID must not change") + self.assertIsNotNone(f_new.schema_id) + self.assertGreater(f_new.schema_id, existing_id, "New ID must follow existing max") + + def test_backfill_continues_from_existing_max(self): + """New IDs start after the highest already-assigned ID.""" + cot = self.create_custom_object_type(name='bf4', slug='bf-4') + # Manually assign schema_id=5 to simulate a gap + f1 = self.create_custom_object_type_field(cot, name='five', type='text') + CustomObjectTypeField.objects.filter(pk=f1.pk).update(schema_id=5) + + f2 = self.create_custom_object_type_field(cot, name='null_one', type='text') + CustomObjectTypeField.objects.filter(pk=f2.pk).update(schema_id=None) + + self._run_backfill() + + f2.refresh_from_db() + self.assertEqual(f2.schema_id, 6) + + def test_next_schema_id_updated_on_cot(self): + from netbox_custom_objects.models import CustomObjectType + cot = self.create_custom_object_type(name='bf5', slug='bf-5') + for i in range(1, 4): + self.create_custom_object_type_field(cot, name=f'g{i}', type='text') + # Force all to NULL and reset counter + CustomObjectTypeField.objects.filter( + custom_object_type=cot + ).update(schema_id=None) + CustomObjectType.objects.filter(pk=cot.pk).update(next_schema_id=0) + + self._run_backfill() + + cot.refresh_from_db() + self.assertEqual(cot.next_schema_id, 3) + + def test_next_schema_id_never_decreases(self): + """If next_schema_id is already high, the backfill must not lower it.""" + from netbox_custom_objects.models import CustomObjectType + cot = self.create_custom_object_type(name='bf6', slug='bf-6') + f = self.create_custom_object_type_field(cot, name='only', type='text') + CustomObjectTypeField.objects.filter(pk=f.pk).update(schema_id=None) + # Artificially set next_schema_id to a large value + CustomObjectType.objects.filter(pk=cot.pk).update(next_schema_id=99) + + self._run_backfill() + + cot.refresh_from_db() + self.assertGreaterEqual(cot.next_schema_id, 99) + + def test_ids_unique_within_cot(self): + cot = self.create_custom_object_type(name='bf7', slug='bf-7') + for i in range(1, 6): + self.create_custom_object_type_field(cot, name=f'h{i}', type='text') + CustomObjectTypeField.objects.filter( + custom_object_type=cot + ).update(schema_id=None) + + self._run_backfill() + + ids = list( + CustomObjectTypeField.objects.filter(custom_object_type=cot) + .values_list('schema_id', flat=True) + ) + self.assertEqual(len(ids), len(set(ids)), "All schema_ids must be unique within COT") + + def test_ids_scoped_per_cot(self): + """Two different COTs both start their IDs from 1.""" + cotA = self.create_custom_object_type(name='bfA', slug='bf-a') + cotB = self.create_custom_object_type(name='bfB', slug='bf-b') + fa = self.create_custom_object_type_field(cotA, name='x', type='text') + fb = self.create_custom_object_type_field(cotB, name='x', type='text') + CustomObjectTypeField.objects.filter(pk__in=[fa.pk, fb.pk]).update(schema_id=None) + from netbox_custom_objects.models import CustomObjectType + CustomObjectType.objects.filter(pk__in=[cotA.pk, cotB.pk]).update(next_schema_id=0) + + self._run_backfill() + + fa.refresh_from_db() + fb.refresh_from_db() + self.assertEqual(fa.schema_id, 1) + self.assertEqual(fb.schema_id, 1) + + def test_idempotent(self): + """Running the backfill twice must not change any already-assigned IDs.""" + cot = self.create_custom_object_type(name='bfIdem', slug='bf-idem') + for i in range(1, 4): + self.create_custom_object_type_field(cot, name=f'i{i}', type='text') + CustomObjectTypeField.objects.filter( + custom_object_type=cot + ).update(schema_id=None) + + self._run_backfill() + ids_after_first = list( + CustomObjectTypeField.objects.filter(custom_object_type=cot) + .order_by('id').values_list('schema_id', flat=True) + ) + + self._run_backfill() + ids_after_second = list( + CustomObjectTypeField.objects.filter(custom_object_type=cot) + .order_by('id').values_list('schema_id', flat=True) + ) + + self.assertEqual(ids_after_first, ids_after_second) + + # =========================================================================== # schema_document and version field tests (plain TestCase — no DDL needed) # =========================================================================== From 53e41f103e5deb0344f351bb251a2651d4b65c79 Mon Sep 17 00:00:00 2001 From: Brian Tiemann Date: Tue, 7 Apr 2026 08:34:22 -0400 Subject: [PATCH 2/9] Ruff fixes --- netbox_custom_objects/exporter.py | 2 +- .../migrations/0006_backfill_schema_ids.py | 2 +- netbox_custom_objects/tests/test_exporter.py | 6 +++--- netbox_custom_objects/tests/test_schema_format.py | 3 ++- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/netbox_custom_objects/exporter.py b/netbox_custom_objects/exporter.py index edafc35..ca914a0 100644 --- a/netbox_custom_objects/exporter.py +++ b/netbox_custom_objects/exporter.py @@ -212,4 +212,4 @@ def export_cots(cots) -> dict: return { "schema_version": SCHEMA_FORMAT_VERSION, "types": [export_cot(cot) for cot in cots], - } \ No newline at end of file + } diff --git a/netbox_custom_objects/migrations/0006_backfill_schema_ids.py b/netbox_custom_objects/migrations/0006_backfill_schema_ids.py index e88d9bc..8be00e2 100644 --- a/netbox_custom_objects/migrations/0006_backfill_schema_ids.py +++ b/netbox_custom_objects/migrations/0006_backfill_schema_ids.py @@ -60,4 +60,4 @@ class Migration(migrations.Migration): operations = [ migrations.RunPython(assign_schema_ids, migrations.RunPython.noop), - ] \ No newline at end of file + ] diff --git a/netbox_custom_objects/tests/test_exporter.py b/netbox_custom_objects/tests/test_exporter.py index 7fad716..42464e9 100644 --- a/netbox_custom_objects/tests/test_exporter.py +++ b/netbox_custom_objects/tests/test_exporter.py @@ -18,7 +18,7 @@ from django.test import TestCase, TransactionTestCase from netbox_custom_objects.exporter import export_cot, export_cots -from netbox_custom_objects.models import CustomObjectType, CustomObjectTypeField +from netbox_custom_objects.models import CustomObjectTypeField from netbox_custom_objects.schema_format import SCHEMA_FORMAT_VERSION from .base import CustomObjectsTestCase, TransactionCleanupMixin @@ -392,7 +392,7 @@ def _assert_valid(self, doc): errors = list(self._validator.iter_errors(doc)) if errors: self.fail( - f"Schema validation failed:\n" + "Schema validation failed:\n" + "\n".join(f" {e.json_path}: {e.message}" for e in errors) ) @@ -430,4 +430,4 @@ def test_minimal_cot_validates(self): def test_multi_type_document_validates(self): cot2 = self.create_custom_object_type(name='second', slug='second') - self._assert_valid(export_cots([self.cot, cot2])) \ No newline at end of file + self._assert_valid(export_cots([self.cot, cot2])) diff --git a/netbox_custom_objects/tests/test_schema_format.py b/netbox_custom_objects/tests/test_schema_format.py index fe39a0a..bd296b3 100644 --- a/netbox_custom_objects/tests/test_schema_format.py +++ b/netbox_custom_objects/tests/test_schema_format.py @@ -189,6 +189,7 @@ def _run_backfill(): ) # The function accepts (apps, schema_editor) but only uses apps.get_model(). # We pass a lightweight shim that delegates to the real models. + class _AppsShim: @staticmethod def get_model(app_label, model_name): @@ -219,7 +220,7 @@ def test_fields_without_schema_id_are_assigned(self): def test_assigned_ids_are_sequential_from_one(self): cot = self.create_custom_object_type(name='bf2', slug='bf-2') - fields = [ + [ self.create_custom_object_type_field(cot, name=f'f{i}', type='text') for i in range(1, 4) ] From 363826cd5f1f0e59ac755df74348fedc2ede7d52 Mon Sep 17 00:00:00 2001 From: Brian Tiemann Date: Tue, 7 Apr 2026 08:56:37 -0400 Subject: [PATCH 3/9] Fix MRO and add validation/notes --- netbox_custom_objects/exporter.py | 11 ++++++++++- netbox_custom_objects/tests/test_exporter.py | 5 +++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/netbox_custom_objects/exporter.py b/netbox_custom_objects/exporter.py index ca914a0..e9ec4d7 100644 --- a/netbox_custom_objects/exporter.py +++ b/netbox_custom_objects/exporter.py @@ -115,7 +115,11 @@ def _export_field(field) -> dict: # ── Type-specific attributes ───────────────────────────────────────────── for attr in sorted(FIELD_TYPE_ATTRS[schema_type]): if attr == "choice_set": - # Required for select/multiselect; always present. + # Required for select/multiselect; validate. + if field.choice_set is None: + raise ValueError( + f"Field {field.name!r} is type {schema_type!r} but has no choice_set assigned." + ) result["choice_set"] = field.choice_set.name elif attr == "related_object_type": # Required for object/multiobject; always present. @@ -142,6 +146,9 @@ def _removed_fields_from_document(cot) -> list: """ if not cot.schema_document: return [] + # NOTE: matches by COT name. If the COT is renamed after tombstones + # are persisted, they will not be found. This will be addressed when + # #390 (apply) is implemented and tombstones are managed more explicitly. for type_def in cot.schema_document.get("types", []): if type_def.get("name") == cot.name: return list(type_def.get("removed_fields", [])) @@ -209,6 +216,8 @@ def export_cots(cots) -> dict: *cots* may be any iterable of ``CustomObjectType`` instances. """ + if not cots: + raise ValueError("Minimum 1 Custom Object Type required.") return { "schema_version": SCHEMA_FORMAT_VERSION, "types": [export_cot(cot) for cot in cots], diff --git a/netbox_custom_objects/tests/test_exporter.py b/netbox_custom_objects/tests/test_exporter.py index 42464e9..aa71f6a 100644 --- a/netbox_custom_objects/tests/test_exporter.py +++ b/netbox_custom_objects/tests/test_exporter.py @@ -323,8 +323,9 @@ def test_deprecated_field_not_in_removed_fields(self): self.assertNotIn("removed_fields", cot_def) -class ExporterSchemaIdTestCase(CustomObjectsTestCase, TransactionTestCase, - TransactionCleanupMixin): +class ExporterSchemaIdTestCase( + TransactionCleanupMixin, CustomObjectsTestCase, TransactionTestCase +): """Fields without schema_id are skipped with a warning.""" def test_field_without_schema_id_is_skipped(self): From b1c25f50211058ab3e3f39fea209e9babb59c92b Mon Sep 17 00:00:00 2001 From: Brian Tiemann Date: Tue, 7 Apr 2026 16:49:30 -0400 Subject: [PATCH 4/9] Schema comparator tool --- netbox_custom_objects/comparator.py | 373 +++++++++++++ .../tests/test_comparator.py | 510 ++++++++++++++++++ 2 files changed, 883 insertions(+) create mode 100644 netbox_custom_objects/comparator.py create mode 100644 netbox_custom_objects/tests/test_comparator.py diff --git a/netbox_custom_objects/comparator.py b/netbox_custom_objects/comparator.py new file mode 100644 index 0000000..2e6d032 --- /dev/null +++ b/netbox_custom_objects/comparator.py @@ -0,0 +1,373 @@ +""" +COT state comparator / diff engine (issue #387). + +Compares an incoming schema document (produced by the exporter or hand-authored) +against the live DB state of each referenced CustomObjectType and returns a +structured diff that the upgrade executor (#389) can consume. + +Public API +---------- + diff_document(schema_doc) → list[COTDiff] + diff_cot(type_def) → COTDiff + +Data model +---------- + COTDiff — top-level result for one COT + .cot_changes — {attr: (db_val, schema_val)} for COT-level attribute changes + .field_changes — list[FieldChange] + .warnings — non-fatal issues (e.g. untracked DB fields) + + FieldChange + .op — FieldOp.ADD | REMOVE | ALTER + .schema_id — the stable numeric field identifier + .db_name — current DB field name (None for ADD) + .schema_def — raw schema field dict (for ADD; also available for ALTER) + .changed_attrs — {attr: (db_val, schema_val)} — populated for ALTER + +Notes +----- +- Fields are matched by schema_id. Fields in the DB without a schema_id cannot + be tracked and are reported as warnings, not as removals. +- A REMOVE operation is only emitted when the field's schema_id appears in the + schema's removed_fields tombstone list. A field absent from both schema.fields + and schema.removed_fields is ambiguous (possibly added outside the workflow) + and generates a warning instead. +- Type changes are included in changed_attrs but are not validated here; the + executor decides whether to allow or reject them. +- related_object_type values are compared in their encoded schema form + ("app_label/model" or "custom-objects/") so the diff output is + round-trip compatible with the schema format. +""" + +import logging +import re +from dataclasses import dataclass, field +from enum import Enum + +from netbox_custom_objects import constants +from netbox_custom_objects.schema_format import ( + CUSTOM_OBJECTS_APP_LABEL_SLUG, + FIELD_DEFAULTS, + FIELD_TYPE_ATTRS, + SCHEMA_TYPE_TO_CHOICES, +) + +logger = logging.getLogger(__name__) + +# Matches TableModel (generated model names for custom object types). +_TABLE_MODEL_RE = re.compile(r'^table(\d+)model$', re.IGNORECASE) + +# Ordered base attributes compared between DB and schema for each field. +# Does NOT include 'name' or 'type' — those are handled separately. +_FIELD_BASE_ATTRS = ( + "label", + "description", + "group_name", + "primary", + "required", + "unique", + "default", + "weight", + "search_weight", + "filter_logic", + "ui_visible", + "ui_editable", + "is_cloneable", + "deprecated", + "deprecated_since", + "scheduled_removal", +) + +# COT-level attributes that may change between schema versions. +# Each maps to its schema-absent default (empty string). +_COT_ATTRS = ( + "name", + "version", + "verbose_name", + "verbose_name_plural", + "description", + "group_name", +) + + +# --------------------------------------------------------------------------- +# Data structures +# --------------------------------------------------------------------------- + +class FieldOp(Enum): + ADD = "add" # field exists in schema but not in DB + REMOVE = "remove" # field tombstoned in schema; still exists in DB + ALTER = "alter" # field in both; has differences (may include rename/type change) + + +@dataclass +class FieldChange: + """A single field-level operation within a COTDiff.""" + op: FieldOp + schema_id: int + db_name: str | None # current DB name; None for ADD + schema_def: dict # the schema field dict + changed_attrs: dict[str, tuple] = field(default_factory=dict) + # {attr: (db_value, schema_value)} — populated for ALTER; + # includes "name" if renamed, "type" if type differs. + + @property + def is_rename(self) -> bool: + return "name" in self.changed_attrs + + @property + def is_type_change(self) -> bool: + return "type" in self.changed_attrs + + +@dataclass +class COTDiff: + """All changes needed to bring one COT in sync with a schema definition.""" + name: str # from schema + slug: str # from schema (used as lookup key) + is_new: bool # True → COT does not yet exist in DB + cot_changes: dict[str, tuple] = field(default_factory=dict) + # {attr: (db_val, schema_val)} for COT-level attribute differences + field_changes: list[FieldChange] = field(default_factory=list) + warnings: list[str] = field(default_factory=list) + + @property + def has_changes(self) -> bool: + return bool(self.cot_changes or self.field_changes) + + @property + def has_destructive_changes(self) -> bool: + """True if any field will be dropped from the DB.""" + return any(fc.op is FieldOp.REMOVE for fc in self.field_changes) + + @property + def adds(self) -> list[FieldChange]: + return [fc for fc in self.field_changes if fc.op is FieldOp.ADD] + + @property + def removes(self) -> list[FieldChange]: + return [fc for fc in self.field_changes if fc.op is FieldOp.REMOVE] + + @property + def alters(self) -> list[FieldChange]: + return [fc for fc in self.field_changes if fc.op is FieldOp.ALTER] + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _encode_rot(rot) -> str: + """ + Encode a related ObjectType FK as a schema ``related_object_type`` string. + Mirrors the logic in exporter._encode_related_object_type. + """ + if rot.app_label == constants.APP_LABEL: + m = _TABLE_MODEL_RE.match(rot.model) + if m: + from netbox_custom_objects.models import CustomObjectType # noqa: PLC0415 + cot_id = int(m.group(1)) + slug = CustomObjectType.objects.values_list('slug', flat=True).get(pk=cot_id) + return f"{CUSTOM_OBJECTS_APP_LABEL_SLUG}/{slug}" + return f"{rot.app_label}/{rot.model}" + + +def _compare_field_attrs(db_field, schema_field: dict) -> dict[str, tuple]: + """ + Return ``{attr: (db_value, schema_value)}`` for every attribute that + differs between *db_field* (a ``CustomObjectTypeField`` instance) and + *schema_field* (the schema dict for that field). + """ + changes: dict[str, tuple] = {} + schema_type = schema_field["type"] # schema string, e.g. "text" + + # ── name ──────────────────────────────────────────────────────────────── + schema_name = schema_field["name"] + if db_field.name != schema_name: + changes["name"] = (db_field.name, schema_name) + + # ── type ───────────────────────────────────────────────────────────────── + expected_choice = SCHEMA_TYPE_TO_CHOICES[schema_type] + if db_field.type != expected_choice: + changes["type"] = (db_field.type, expected_choice) + + # ── base scalar attributes ─────────────────────────────────────────────── + for attr in _FIELD_BASE_ATTRS: + db_val = getattr(db_field, attr) + schema_val = schema_field.get(attr, FIELD_DEFAULTS.get(attr)) + if db_val != schema_val: + changes[attr] = (db_val, schema_val) + + # ── type-specific attributes ───────────────────────────────────────────── + type_specific = FIELD_TYPE_ATTRS.get(schema_type, set()) + + if "validation_regex" in type_specific: + dv = db_field.validation_regex or "" + sv = schema_field.get("validation_regex", "") + if dv != sv: + changes["validation_regex"] = (dv, sv) + + if "validation_minimum" in type_specific: + dv = db_field.validation_minimum + sv = schema_field.get("validation_minimum", None) + if dv != sv: + changes["validation_minimum"] = (dv, sv) + + if "validation_maximum" in type_specific: + dv = db_field.validation_maximum + sv = schema_field.get("validation_maximum", None) + if dv != sv: + changes["validation_maximum"] = (dv, sv) + + if "choice_set" in type_specific: + dv = db_field.choice_set.name if db_field.choice_set_id else None + sv = schema_field.get("choice_set") + if dv != sv: + changes["choice_set"] = (dv, sv) + + if "related_object_type" in type_specific: + dv = _encode_rot(db_field.related_object_type) if db_field.related_object_type_id else None + sv = schema_field.get("related_object_type") + if dv != sv: + changes["related_object_type"] = (dv, sv) + + if "related_object_filter" in type_specific: + dv = db_field.related_object_filter + sv = schema_field.get("related_object_filter", None) + if dv != sv: + changes["related_object_filter"] = (dv, sv) + + return changes + + +def _compare_cot_attrs(cot, type_def: dict) -> dict[str, tuple]: + """ + Return ``{attr: (db_value, schema_value)}`` for COT-level attributes + that differ. Absent schema keys are treated as empty string (same + convention as the exporter). + """ + changes: dict[str, tuple] = {} + for attr in _COT_ATTRS: + db_val = getattr(cot, attr) or "" + schema_val = type_def.get(attr) or "" + if db_val != schema_val: + changes[attr] = (db_val, schema_val) + return changes + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def diff_cot(type_def: dict) -> COTDiff: + """ + Compare one COT schema definition against the live DB state. + + *type_def* is a single entry from the ``types`` list of a schema document + (as produced by :func:`export_cot` or hand-authored). + + Returns a :class:`COTDiff` describing what would need to change. + """ + from netbox_custom_objects.models import CustomObjectType # noqa: PLC0415 + + slug = type_def["slug"] + name = type_def["name"] + + # ── COT existence check ────────────────────────────────────────────────── + try: + cot = CustomObjectType.objects.get(slug=slug) + except CustomObjectType.DoesNotExist: + # Brand-new COT — every schema field is an ADD. + field_changes = [ + FieldChange( + op=FieldOp.ADD, + schema_id=sf["id"], + db_name=None, + schema_def=sf, + ) + for sf in type_def.get("fields", []) + ] + return COTDiff( + name=name, + slug=slug, + is_new=True, + field_changes=field_changes, + ) + + diff = COTDiff(name=name, slug=slug, is_new=False) + + # ── COT-level attribute diff ───────────────────────────────────────────── + diff.cot_changes = _compare_cot_attrs(cot, type_def) + + # ── Build lookup indexes ───────────────────────────────────────────────── + schema_fields: dict[int, dict] = { + sf["id"]: sf for sf in type_def.get("fields", []) + } + tombstoned_ids: set[int] = { + rf["id"] for rf in type_def.get("removed_fields", []) + } + db_fields: dict[int, object] = { + f.schema_id: f + for f in cot.fields.filter(schema_id__isnull=False) + .select_related("choice_set", "related_object_type") + } + + # Warn about DB fields with no schema_id (invisible to the diff). + untracked = cot.fields.filter(schema_id__isnull=True) + for f in untracked: + diff.warnings.append( + f"Field {f.name!r} (pk={f.pk}) has no schema_id and cannot be " + "tracked by the schema diff. It will not be affected by apply operations." + ) + + # ── Schema fields → ADD or ALTER ───────────────────────────────────────── + for schema_id, schema_field in schema_fields.items(): + if schema_id in db_fields: + db_field = db_fields[schema_id] + changed = _compare_field_attrs(db_field, schema_field) + if changed: + diff.field_changes.append(FieldChange( + op=FieldOp.ALTER, + schema_id=schema_id, + db_name=db_field.name, + schema_def=schema_field, + changed_attrs=changed, + )) + else: + diff.field_changes.append(FieldChange( + op=FieldOp.ADD, + schema_id=schema_id, + db_name=None, + schema_def=schema_field, + )) + + # ── DB fields absent from schema → REMOVE or warn ──────────────────────── + for schema_id, db_field in db_fields.items(): + if schema_id in schema_fields: + continue # already handled above + if schema_id in tombstoned_ids: + diff.field_changes.append(FieldChange( + op=FieldOp.REMOVE, + schema_id=schema_id, + db_name=db_field.name, + schema_def={}, + )) + else: + diff.warnings.append( + f"Field {db_field.name!r} (schema_id={schema_id}) exists in the DB " + "but is absent from both schema.fields and schema.removed_fields. " + "It was likely added outside the schema workflow and will not be " + "affected by apply operations." + ) + + return diff + + +def diff_document(schema_doc: dict) -> list[COTDiff]: + """ + Diff all COTs in a schema document against current DB state. + + Returns a list of :class:`COTDiff` objects, one per entry in + ``schema_doc["types"]``. + """ + return [diff_cot(type_def) for type_def in schema_doc.get("types", [])] diff --git a/netbox_custom_objects/tests/test_comparator.py b/netbox_custom_objects/tests/test_comparator.py new file mode 100644 index 0000000..751ded0 --- /dev/null +++ b/netbox_custom_objects/tests/test_comparator.py @@ -0,0 +1,510 @@ +""" +Tests for the COT state comparator / diff engine (issue #387). + +Covers: +- No-change (clean) diff +- New COT (not in DB) +- COT-level attribute changes +- Field ADD (in schema, not in DB) +- Field REMOVE (tombstoned; still in DB) +- Field ALTER: rename, type change, scalar attribute changes +- Field ALTER: choice_set change +- Field ALTER: related_object_type change (built-in and custom) +- Field ALTER: related_object_filter change +- Untracked fields (no schema_id) → warning, not REMOVE +- DB field absent from schema AND not tombstoned → warning, not REMOVE +- Multi-COT document +- has_changes / has_destructive_changes / adds / removes / alters helpers +""" + +from django.test import TestCase + +from netbox_custom_objects.comparator import ( + COTDiff, + FieldOp, + diff_cot, + diff_document, +) +from netbox_custom_objects.exporter import export_cot, export_cots +from netbox_custom_objects.models import CustomObjectTypeField + +from .base import CustomObjectsTestCase + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _fc(diff: COTDiff, schema_id: int): + """Return the FieldChange for the given schema_id, or raise.""" + for fc in diff.field_changes: + if fc.schema_id == schema_id: + return fc + raise KeyError(f"No FieldChange with schema_id={schema_id}") + + +# --------------------------------------------------------------------------- +# Test cases +# --------------------------------------------------------------------------- + +class ComparatorCleanDiffTestCase(CustomObjectsTestCase, TestCase): + """Round-trip: export → diff should produce no changes.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type( + name='clean', slug='clean', version='1.0.0', + ) + cls.choice_set = cls.create_choice_set(name='Clean Choices') + cls.device_ot = cls.get_device_object_type() + + cls.create_custom_object_type_field(cls.cot, name='label', type='text') + cls.create_custom_object_type_field( + cls.cot, name='count', type='integer', + validation_minimum=0, validation_maximum=100, + ) + cls.create_custom_object_type_field( + cls.cot, name='status', type='select', choice_set=cls.choice_set, + ) + cls.create_custom_object_type_field( + cls.cot, name='device', type='object', + related_object_type=cls.device_ot, + ) + + def test_clean_diff_has_no_changes(self): + type_def = export_cot(self.cot) + result = diff_cot(type_def) + self.assertFalse(result.has_changes) + self.assertEqual(result.field_changes, []) + self.assertEqual(result.cot_changes, {}) + + def test_clean_diff_document_has_no_changes(self): + doc = export_cots([self.cot]) + diffs = diff_document(doc) + self.assertEqual(len(diffs), 1) + self.assertFalse(diffs[0].has_changes) + + def test_no_warnings_for_fully_tracked_cot(self): + type_def = export_cot(self.cot) + result = diff_cot(type_def) + self.assertEqual(result.warnings, []) + + +class ComparatorNewCOTTestCase(CustomObjectsTestCase, TestCase): + """COT not yet in DB → is_new=True, all fields are ADD.""" + + def test_new_cot_is_new(self): + result = diff_cot({ + "name": "newtype", + "slug": "newtype", + "fields": [ + {"id": 1, "name": "label", "type": "text"}, + {"id": 2, "name": "count", "type": "integer"}, + ], + }) + self.assertTrue(result.is_new) + + def test_new_cot_all_fields_are_add(self): + result = diff_cot({ + "name": "newtype2", + "slug": "newtype2", + "fields": [ + {"id": 1, "name": "label", "type": "text"}, + {"id": 2, "name": "count", "type": "integer"}, + ], + }) + self.assertEqual(len(result.field_changes), 2) + for fc in result.field_changes: + self.assertIs(fc.op, FieldOp.ADD) + + def test_new_cot_no_fields_no_field_changes(self): + result = diff_cot({"name": "empty", "slug": "empty"}) + self.assertTrue(result.is_new) + self.assertEqual(result.field_changes, []) + + +class ComparatorCOTAttrsTestCase(CustomObjectsTestCase, TestCase): + """COT-level attribute changes are captured in cot_changes.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type( + name='attrtest', slug='attr-test', + version='1.0.0', description='old desc', + verbose_name='', verbose_name_plural='', + ) + + def test_version_change_detected(self): + type_def = export_cot(self.cot) + type_def["version"] = "2.0.0" + result = diff_cot(type_def) + self.assertIn("version", result.cot_changes) + self.assertEqual(result.cot_changes["version"], ("1.0.0", "2.0.0")) + + def test_description_change_detected(self): + type_def = export_cot(self.cot) + type_def["description"] = "new desc" + result = diff_cot(type_def) + self.assertIn("description", result.cot_changes) + + def test_unchanged_cot_attrs_not_in_cot_changes(self): + type_def = export_cot(self.cot) + result = diff_cot(type_def) + self.assertNotIn("version", result.cot_changes) + self.assertNotIn("description", result.cot_changes) + + def test_cot_changes_do_not_produce_field_changes(self): + type_def = export_cot(self.cot) + type_def["version"] = "9.9.9" + result = diff_cot(type_def) + self.assertEqual(result.field_changes, []) + + +class ComparatorFieldAddTestCase(CustomObjectsTestCase, TestCase): + """Fields present in schema but absent from DB → ADD.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type(name='addtest', slug='add-test') + cls.create_custom_object_type_field(cls.cot, name='existing', type='text') + + def test_new_field_in_schema_is_add(self): + type_def = export_cot(self.cot) + # Append a field with a new schema_id that doesn't exist in DB + type_def.setdefault("fields", []).append( + {"id": 999, "name": "brand_new", "type": "boolean"} + ) + result = diff_cot(type_def) + adds = result.adds + self.assertEqual(len(adds), 1) + self.assertEqual(adds[0].schema_id, 999) + self.assertIsNone(adds[0].db_name) + self.assertEqual(adds[0].schema_def["name"], "brand_new") + + def test_add_does_not_affect_existing_fields(self): + type_def = export_cot(self.cot) + type_def.setdefault("fields", []).append( + {"id": 999, "name": "extra", "type": "text"} + ) + result = diff_cot(type_def) + # existing field should not appear in changes + self.assertEqual(len(result.alters), 0) + + +class ComparatorFieldRemoveTestCase(CustomObjectsTestCase, TestCase): + """Fields tombstoned in schema and still in DB → REMOVE.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type(name='removetest', slug='remove-test') + cls.field = cls.create_custom_object_type_field( + cls.cot, name='to_remove', type='text' + ) + + def test_tombstoned_db_field_is_remove(self): + field_schema_id = self.field.schema_id + type_def = { + "name": self.cot.name, + "slug": self.cot.slug, + "fields": [], + "removed_fields": [ + {"id": field_schema_id, "name": "to_remove", "type": "text"} + ], + } + result = diff_cot(type_def) + removes = result.removes + self.assertEqual(len(removes), 1) + self.assertEqual(removes[0].schema_id, field_schema_id) + self.assertEqual(removes[0].db_name, "to_remove") + + def test_remove_is_flagged_as_destructive(self): + field_schema_id = self.field.schema_id + type_def = { + "name": self.cot.name, + "slug": self.cot.slug, + "fields": [], + "removed_fields": [ + {"id": field_schema_id, "name": "to_remove", "type": "text"} + ], + } + result = diff_cot(type_def) + self.assertTrue(result.has_destructive_changes) + + def test_already_removed_from_db_is_noop(self): + """If tombstoned field is already gone from DB, no REMOVE emitted.""" + type_def = { + "name": self.cot.name, + "slug": self.cot.slug, + "fields": [], + "removed_fields": [ + {"id": 9999, "name": "ghost", "type": "text"} # not in DB + ], + } + result = diff_cot(type_def) + self.assertEqual(result.removes, []) + self.assertFalse(result.has_destructive_changes) + + +class ComparatorFieldAlterTestCase(CustomObjectsTestCase, TestCase): + """Attribute changes on existing fields → ALTER.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type(name='altertest', slug='alter-test') + cls.choice_set_a = cls.create_choice_set(name='Set A') + cls.choice_set_b = cls.create_choice_set(name='Set B') + cls.device_ot = cls.get_device_object_type() + cls.site_ot = cls.get_site_object_type() + + def _alter_field(self, field_name, **overrides): + """Export the COT, override one field's schema dict, return diff.""" + type_def = export_cot(self.cot) + for sf in type_def.get("fields", []): + if sf["name"] == field_name: + sf.update(overrides) + break + return diff_cot(type_def) + + def test_rename_detected(self): + self.create_custom_object_type_field(self.cot, name='old_name', type='text') + result = self._alter_field("old_name", name="new_name") + fc = next(fc for fc in result.alters if fc.db_name == "old_name") + self.assertTrue(fc.is_rename) + self.assertEqual(fc.changed_attrs["name"], ("old_name", "new_name")) + + def test_type_change_detected(self): + self.create_custom_object_type_field(self.cot, name='typed', type='text') + result = self._alter_field("typed", type="longtext") + fc = next(fc for fc in result.alters if fc.db_name == "typed") + self.assertTrue(fc.is_type_change) + self.assertIn("type", fc.changed_attrs) + + def test_required_change_detected(self): + self.create_custom_object_type_field( + self.cot, name='req_field', type='text', required=False + ) + result = self._alter_field("req_field", required=True) + fc = next(fc for fc in result.alters if fc.db_name == "req_field") + self.assertIn("required", fc.changed_attrs) + self.assertEqual(fc.changed_attrs["required"], (False, True)) + + def test_weight_change_detected(self): + self.create_custom_object_type_field( + self.cot, name='weighted', type='text', weight=100 + ) + result = self._alter_field("weighted", weight=200) + fc = next(fc for fc in result.alters if fc.db_name == "weighted") + self.assertIn("weight", fc.changed_attrs) + + def test_validation_regex_change_detected(self): + self.create_custom_object_type_field( + self.cot, name='regex_field', type='text', + validation_regex=r'^[A-Z]+$' + ) + result = self._alter_field("regex_field", validation_regex=r'^\d+$') + fc = next(fc for fc in result.alters if fc.db_name == "regex_field") + self.assertIn("validation_regex", fc.changed_attrs) + + def test_validation_min_max_change_detected(self): + self.create_custom_object_type_field( + self.cot, name='numeric', type='integer', + validation_minimum=0, validation_maximum=100 + ) + result = self._alter_field("numeric", validation_minimum=10, validation_maximum=200) + fc = next(fc for fc in result.alters if fc.db_name == "numeric") + self.assertIn("validation_minimum", fc.changed_attrs) + self.assertIn("validation_maximum", fc.changed_attrs) + + def test_choice_set_change_detected(self): + self.create_custom_object_type_field( + self.cot, name='select_field', type='select', + choice_set=self.choice_set_a, + ) + result = self._alter_field("select_field", choice_set="Set B") + fc = next(fc for fc in result.alters if fc.db_name == "select_field") + self.assertIn("choice_set", fc.changed_attrs) + self.assertEqual(fc.changed_attrs["choice_set"], ("Set A", "Set B")) + + def test_related_object_type_change_detected(self): + self.create_custom_object_type_field( + self.cot, name='obj_field', type='object', + related_object_type=self.device_ot, + ) + result = self._alter_field("obj_field", related_object_type="dcim/site") + fc = next(fc for fc in result.alters if fc.db_name == "obj_field") + self.assertIn("related_object_type", fc.changed_attrs) + self.assertEqual(fc.changed_attrs["related_object_type"][0], "dcim/device") + self.assertEqual(fc.changed_attrs["related_object_type"][1], "dcim/site") + + def test_related_object_type_custom_cot_encoding(self): + other_cot = self.create_custom_object_type(name='rack', slug='rack') + rack_ot = other_cot.object_type + self.create_custom_object_type_field( + self.cot, name='rack_field', type='object', + related_object_type=rack_ot, + ) + # Export and re-diff — should be clean (no change) + type_def = export_cot(self.cot) + result = diff_cot(type_def) + rack_alters = [fc for fc in result.alters if fc.db_name == "rack_field"] + self.assertEqual(rack_alters, [], "Custom COT related_object_type round-trips cleanly") + + def test_related_object_filter_change_detected(self): + self.create_custom_object_type_field( + self.cot, name='filtered', type='object', + related_object_type=self.device_ot, + related_object_filter={"site_id": [1]}, + ) + result = self._alter_field("filtered", related_object_filter={"site_id": [2]}) + fc = next(fc for fc in result.alters if fc.db_name == "filtered") + self.assertIn("related_object_filter", fc.changed_attrs) + + def test_no_alter_when_nothing_changed(self): + self.create_custom_object_type_field( + self.cot, name='unchanged', type='text' + ) + type_def = export_cot(self.cot) + result = diff_cot(type_def) + unchanged_alters = [fc for fc in result.alters if fc.db_name == "unchanged"] + self.assertEqual(unchanged_alters, []) + + def test_deprecated_change_detected(self): + self.create_custom_object_type_field( + self.cot, name='dep_field', type='text' + ) + result = self._alter_field( + "dep_field", + deprecated=True, + deprecated_since="1.1.0", + scheduled_removal="2.0.0", + ) + fc = next(fc for fc in result.alters if fc.db_name == "dep_field") + self.assertIn("deprecated", fc.changed_attrs) + self.assertIn("deprecated_since", fc.changed_attrs) + self.assertIn("scheduled_removal", fc.changed_attrs) + + +class ComparatorWarningsTestCase(CustomObjectsTestCase, TestCase): + """Warning conditions: untracked fields, ambiguous absences.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type(name='warntest', slug='warn-test') + + def test_untracked_field_emits_warning(self): + f = self.create_custom_object_type_field( + self.cot, name='untracked', type='text' + ) + CustomObjectTypeField.objects.filter(pk=f.pk).update(schema_id=None) + type_def = export_cot(self.cot) # won't include the untracked field + result = diff_cot(type_def) + self.assertTrue( + any("untracked" in w for w in result.warnings), + "Expected warning about field with no schema_id", + ) + + def test_untracked_field_not_in_field_changes(self): + f = self.create_custom_object_type_field( + self.cot, name='untracked2', type='text' + ) + CustomObjectTypeField.objects.filter(pk=f.pk).update(schema_id=None) + type_def = export_cot(self.cot) + result = diff_cot(type_def) + self.assertEqual(result.field_changes, []) + + def test_db_field_absent_from_schema_and_not_tombstoned_emits_warning(self): + self.create_custom_object_type_field( + self.cot, name='mystery', type='text' + ) + # Schema with empty fields (field not tombstoned, just absent) + type_def = { + "name": self.cot.name, + "slug": self.cot.slug, + "fields": [], + } + result = diff_cot(type_def) + self.assertTrue( + any("mystery" in w for w in result.warnings), + "Expected warning about field absent from schema but not tombstoned", + ) + # Must NOT be a REMOVE + self.assertFalse(result.has_destructive_changes) + + +class ComparatorMultiCOTTestCase(CustomObjectsTestCase, TestCase): + """diff_document processes all COTs in the document.""" + + @classmethod + def setUpTestData(cls): + cls.cot1 = cls.create_custom_object_type(name='multi1', slug='multi-1') + cls.cot2 = cls.create_custom_object_type(name='multi2', slug='multi-2') + cls.create_custom_object_type_field(cls.cot1, name='f1', type='text') + cls.create_custom_object_type_field(cls.cot2, name='f2', type='text') + + def test_document_diff_returns_one_result_per_type(self): + doc = export_cots([self.cot1, self.cot2]) + diffs = diff_document(doc) + self.assertEqual(len(diffs), 2) + + def test_document_diff_clean_round_trip(self): + doc = export_cots([self.cot1, self.cot2]) + diffs = diff_document(doc) + for d in diffs: + self.assertFalse(d.has_changes) + + def test_document_diff_mixed_new_and_existing(self): + doc = export_cots([self.cot1]) + doc["types"].append({"name": "brandnew", "slug": "brand-new"}) + diffs = diff_document(doc) + self.assertEqual(len(diffs), 2) + existing = next(d for d in diffs if d.slug == "multi-1") + new_ = next(d for d in diffs if d.slug == "brand-new") + self.assertFalse(existing.is_new) + self.assertTrue(new_.is_new) + + +class ComparatorHelperPropertiesTestCase(CustomObjectsTestCase, TestCase): + """has_changes, has_destructive_changes, adds/removes/alters helpers.""" + + @classmethod + def setUpTestData(cls): + cls.cot = cls.create_custom_object_type(name='helpers', slug='helpers') + cls.field = cls.create_custom_object_type_field( + cls.cot, name='h1', type='text' + ) + + def test_has_changes_false_when_clean(self): + type_def = export_cot(self.cot) + result = diff_cot(type_def) + self.assertFalse(result.has_changes) + + def test_has_changes_true_when_field_added(self): + type_def = export_cot(self.cot) + type_def.setdefault("fields", []).append( + {"id": 999, "name": "extra", "type": "text"} + ) + result = diff_cot(type_def) + self.assertTrue(result.has_changes) + + def test_has_destructive_changes_false_when_no_removes(self): + type_def = export_cot(self.cot) + result = diff_cot(type_def) + self.assertFalse(result.has_destructive_changes) + + def test_adds_removes_alters_filters_correctly(self): + fid = self.field.schema_id + type_def = { + "name": self.cot.name, + "slug": self.cot.slug, + "fields": [ + {"id": fid, "name": "h1", "type": "text", "required": True}, # alter + {"id": 888, "name": "new_one", "type": "boolean"}, # add + ], + "removed_fields": [], + } + result = diff_cot(type_def) + self.assertEqual(len(result.adds), 1) + self.assertEqual(result.adds[0].schema_id, 888) + self.assertEqual(len(result.alters), 1) + self.assertEqual(result.alters[0].schema_id, fid) + self.assertEqual(result.removes, []) From 326c143c41769eae8663a6afa41a4f439b38b5b6 Mon Sep 17 00:00:00 2001 From: Brian Tiemann Date: Wed, 8 Apr 2026 19:43:38 -0400 Subject: [PATCH 5/9] Remove 0006 migration --- .../migrations/0006_backfill_schema_ids.py | 63 ------------------- 1 file changed, 63 deletions(-) delete mode 100644 netbox_custom_objects/migrations/0006_backfill_schema_ids.py diff --git a/netbox_custom_objects/migrations/0006_backfill_schema_ids.py b/netbox_custom_objects/migrations/0006_backfill_schema_ids.py deleted file mode 100644 index 8be00e2..0000000 --- a/netbox_custom_objects/migrations/0006_backfill_schema_ids.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -Data migration: assign schema_id to existing CustomObjectTypeField rows that -predate the schema-format feature and never received one. - -Strategy --------- -For each CustomObjectType: - 1. Find the current maximum schema_id already in use (may be 0 if none). - 2. Assign the next available integer to every field with schema_id=NULL, - ordered by the field's primary-key (creation order) for determinism. - 3. Update next_schema_id on the parent CustomObjectType to the highest ID - now assigned, so that future field additions continue from the right value. - -The reverse operation is intentionally a no-op: rolling back would leave the -schema_id column in an indeterminate state, and re-running the forward -migration is safe (it only touches NULL rows). -""" - -from django.db import migrations -from django.db.models import Max - - -# Exposed as a module-level name so tests can import and call it directly -# without going through the migration runner. -def assign_schema_ids(apps, schema_editor): - CustomObjectType = apps.get_model('netbox_custom_objects', 'CustomObjectType') - CustomObjectTypeField = apps.get_model('netbox_custom_objects', 'CustomObjectTypeField') - - for cot in CustomObjectType.objects.all(): - # Highest schema_id already in use for this COT (0 if none). - current_max = ( - CustomObjectTypeField.objects - .filter(custom_object_type=cot, schema_id__isnull=False) - .aggregate(max_id=Max('schema_id'))['max_id'] or 0 - ) - - # Assign the next integers to all unassigned fields, ordered by pk. - next_id = current_max + 1 - for field in ( - CustomObjectTypeField.objects - .filter(custom_object_type=cot, schema_id__isnull=True) - .order_by('id') - ): - CustomObjectTypeField.objects.filter(pk=field.pk).update(schema_id=next_id) - next_id += 1 - - # Sync next_schema_id upward. Never decrease it. - highest_assigned = next_id - 1 # equals current_max when no NULLs existed - if highest_assigned > cot.next_schema_id: - CustomObjectType.objects.filter(pk=cot.pk).update( - next_schema_id=highest_assigned - ) - - -class Migration(migrations.Migration): - - dependencies = [ - ('netbox_custom_objects', '0005_customobjecttype_next_schema_id_and_more'), - ] - - operations = [ - migrations.RunPython(assign_schema_ids, migrations.RunPython.noop), - ] From 7a04010ee0d696b843e11ffaddbc88b33f3c81e0 Mon Sep 17 00:00:00 2001 From: Brian Tiemann Date: Wed, 8 Apr 2026 20:05:43 -0400 Subject: [PATCH 6/9] Cleanup --- netbox_custom_objects/comparator.py | 50 ++++++++++++++----- .../tests/test_comparator.py | 13 ----- 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/netbox_custom_objects/comparator.py b/netbox_custom_objects/comparator.py index 2e6d032..fbb6b25 100644 --- a/netbox_custom_objects/comparator.py +++ b/netbox_custom_objects/comparator.py @@ -39,7 +39,6 @@ round-trip compatible with the schema format. """ -import logging import re from dataclasses import dataclass, field from enum import Enum @@ -52,8 +51,6 @@ SCHEMA_TYPE_TO_CHOICES, ) -logger = logging.getLogger(__name__) - # Matches TableModel (generated model names for custom object types). _TABLE_MODEL_RE = re.compile(r'^table(\d+)model$', re.IGNORECASE) @@ -157,22 +154,34 @@ def alters(self) -> list[FieldChange]: # Internal helpers # --------------------------------------------------------------------------- -def _encode_rot(rot) -> str: +def _encode_rot(rot, cot_slug_cache: dict, warnings: list) -> str: """ Encode a related ObjectType FK as a schema ``related_object_type`` string. Mirrors the logic in exporter._encode_related_object_type. + + *cot_slug_cache* is a ``{pk: slug}`` dict pre-fetched in :func:`diff_cot` + to avoid one DB query per object field. If a custom COT is referenced but + not present in the cache (i.e. its DB row was deleted), a warning is + appended to *warnings* and a stable fallback string is returned so the + diff can still proceed. """ if rot.app_label == constants.APP_LABEL: m = _TABLE_MODEL_RE.match(rot.model) if m: - from netbox_custom_objects.models import CustomObjectType # noqa: PLC0415 cot_id = int(m.group(1)) - slug = CustomObjectType.objects.values_list('slug', flat=True).get(pk=cot_id) + slug = cot_slug_cache.get(cot_id) + if slug is None: + warnings.append( + f"CustomObjectType pk={cot_id} is referenced by a field but " + "no longer exists in the DB. The related_object_type comparison " + "for this field may be inaccurate." + ) + return f"{CUSTOM_OBJECTS_APP_LABEL_SLUG}/" return f"{CUSTOM_OBJECTS_APP_LABEL_SLUG}/{slug}" return f"{rot.app_label}/{rot.model}" -def _compare_field_attrs(db_field, schema_field: dict) -> dict[str, tuple]: +def _compare_field_attrs(db_field, schema_field: dict, cot_slug_cache: dict, warnings: list) -> dict[str, tuple]: """ Return ``{attr: (db_value, schema_value)}`` for every attribute that differs between *db_field* (a ``CustomObjectTypeField`` instance) and @@ -226,7 +235,7 @@ def _compare_field_attrs(db_field, schema_field: dict) -> dict[str, tuple]: changes["choice_set"] = (dv, sv) if "related_object_type" in type_specific: - dv = _encode_rot(db_field.related_object_type) if db_field.related_object_type_id else None + dv = _encode_rot(db_field.related_object_type, cot_slug_cache, warnings) if db_field.related_object_type_id else None sv = schema_field.get("related_object_type") if dv != sv: changes["related_object_type"] = (dv, sv) @@ -303,8 +312,10 @@ def diff_cot(type_def: dict) -> COTDiff: schema_fields: dict[int, dict] = { sf["id"]: sf for sf in type_def.get("fields", []) } - tombstoned_ids: set[int] = { - rf["id"] for rf in type_def.get("removed_fields", []) + # Map schema_id → full tombstone dict so REMOVE FieldChanges carry the + # original field definition (name, type, etc.) rather than an empty dict. + tombstoned: dict[int, dict] = { + rf["id"]: rf for rf in type_def.get("removed_fields", []) } db_fields: dict[int, object] = { f.schema_id: f @@ -312,6 +323,19 @@ def diff_cot(type_def: dict) -> COTDiff: .select_related("choice_set", "related_object_type") } + # Pre-fetch slugs for all custom-COT related_object_type references in a + # single query to avoid one DB round-trip per object/multiobject field. + cot_ids: set[int] = set() + for f in db_fields.values(): + if f.related_object_type_id and f.related_object_type.app_label == constants.APP_LABEL: + m = _TABLE_MODEL_RE.match(f.related_object_type.model) + if m: + cot_ids.add(int(m.group(1))) + cot_slug_cache: dict[int, str] = ( + dict(CustomObjectType.objects.filter(pk__in=cot_ids).values_list("pk", "slug")) + if cot_ids else {} + ) + # Warn about DB fields with no schema_id (invisible to the diff). untracked = cot.fields.filter(schema_id__isnull=True) for f in untracked: @@ -324,7 +348,7 @@ def diff_cot(type_def: dict) -> COTDiff: for schema_id, schema_field in schema_fields.items(): if schema_id in db_fields: db_field = db_fields[schema_id] - changed = _compare_field_attrs(db_field, schema_field) + changed = _compare_field_attrs(db_field, schema_field, cot_slug_cache, diff.warnings) if changed: diff.field_changes.append(FieldChange( op=FieldOp.ALTER, @@ -345,12 +369,12 @@ def diff_cot(type_def: dict) -> COTDiff: for schema_id, db_field in db_fields.items(): if schema_id in schema_fields: continue # already handled above - if schema_id in tombstoned_ids: + if schema_id in tombstoned: diff.field_changes.append(FieldChange( op=FieldOp.REMOVE, schema_id=schema_id, db_name=db_field.name, - schema_def={}, + schema_def=tombstoned[schema_id], )) else: diff.warnings.append( diff --git a/netbox_custom_objects/tests/test_comparator.py b/netbox_custom_objects/tests/test_comparator.py index 751ded0..308f065 100644 --- a/netbox_custom_objects/tests/test_comparator.py +++ b/netbox_custom_objects/tests/test_comparator.py @@ -20,7 +20,6 @@ from django.test import TestCase from netbox_custom_objects.comparator import ( - COTDiff, FieldOp, diff_cot, diff_document, @@ -31,18 +30,6 @@ from .base import CustomObjectsTestCase -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -def _fc(diff: COTDiff, schema_id: int): - """Return the FieldChange for the given schema_id, or raise.""" - for fc in diff.field_changes: - if fc.schema_id == schema_id: - return fc - raise KeyError(f"No FieldChange with schema_id={schema_id}") - - # --------------------------------------------------------------------------- # Test cases # --------------------------------------------------------------------------- From 4197403e023d5999aaff0cbbf04cf689a27e5f3c Mon Sep 17 00:00:00 2001 From: Brian Tiemann Date: Wed, 8 Apr 2026 20:08:04 -0400 Subject: [PATCH 7/9] Ruff fix --- netbox_custom_objects/comparator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/netbox_custom_objects/comparator.py b/netbox_custom_objects/comparator.py index fbb6b25..bed5387 100644 --- a/netbox_custom_objects/comparator.py +++ b/netbox_custom_objects/comparator.py @@ -235,7 +235,9 @@ def _compare_field_attrs(db_field, schema_field: dict, cot_slug_cache: dict, war changes["choice_set"] = (dv, sv) if "related_object_type" in type_specific: - dv = _encode_rot(db_field.related_object_type, cot_slug_cache, warnings) if db_field.related_object_type_id else None + dv = ( + _encode_rot(db_field.related_object_type, cot_slug_cache, warnings) + ) if db_field.related_object_type_id else None sv = schema_field.get("related_object_type") if dv != sv: changes["related_object_type"] = (dv, sv) From fa581d0b183ebf50e2f97df4016e7b6c43292eae Mon Sep 17 00:00:00 2001 From: Brian Tiemann Date: Wed, 8 Apr 2026 20:23:01 -0400 Subject: [PATCH 8/9] Address review feedback --- netbox_custom_objects/comparator.py | 32 +++++++++++-------- .../tests/test_comparator.py | 8 ++++- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/netbox_custom_objects/comparator.py b/netbox_custom_objects/comparator.py index bed5387..9935600 100644 --- a/netbox_custom_objects/comparator.py +++ b/netbox_custom_objects/comparator.py @@ -42,8 +42,12 @@ import re from dataclasses import dataclass, field from enum import Enum +from typing import TYPE_CHECKING from netbox_custom_objects import constants + +if TYPE_CHECKING: + from django.contrib.contenttypes.models import ContentType from netbox_custom_objects.schema_format import ( CUSTOM_OBJECTS_APP_LABEL_SLUG, FIELD_DEFAULTS, @@ -154,7 +158,7 @@ def alters(self) -> list[FieldChange]: # Internal helpers # --------------------------------------------------------------------------- -def _encode_rot(rot, cot_slug_cache: dict, warnings: list) -> str: +def _encode_rot(rot: "ContentType", cot_slug_cache: dict, warnings: list) -> str: """ Encode a related ObjectType FK as a schema ``related_object_type`` string. Mirrors the logic in exporter._encode_related_object_type. @@ -259,6 +263,9 @@ def _compare_cot_attrs(cot, type_def: dict) -> dict[str, tuple]: """ changes: dict[str, tuple] = {} for attr in _COT_ATTRS: + # All _COT_ATTRS are string fields; None and "" both mean "absent" — same + # convention as the exporter. _COT_ATTRS must never include numeric or + # boolean fields, as `or ""` would swallow falsy values like 0 or False. db_val = getattr(cot, attr) or "" schema_val = type_def.get(attr) or "" if db_val != schema_val: @@ -319,11 +326,16 @@ def diff_cot(type_def: dict) -> COTDiff: tombstoned: dict[int, dict] = { rf["id"]: rf for rf in type_def.get("removed_fields", []) } - db_fields: dict[int, object] = { - f.schema_id: f - for f in cot.fields.filter(schema_id__isnull=False) - .select_related("choice_set", "related_object_type") - } + # Single query for all fields; partition into tracked/untracked in Python. + db_fields: dict[int, object] = {} + for f in cot.fields.select_related("choice_set", "related_object_type"): + if f.schema_id is None: + diff.warnings.append( + f"Field {f.name!r} (pk={f.pk}) has no schema_id and cannot be " + "tracked by the schema diff. It will not be affected by apply operations." + ) + else: + db_fields[f.schema_id] = f # Pre-fetch slugs for all custom-COT related_object_type references in a # single query to avoid one DB round-trip per object/multiobject field. @@ -338,14 +350,6 @@ def diff_cot(type_def: dict) -> COTDiff: if cot_ids else {} ) - # Warn about DB fields with no schema_id (invisible to the diff). - untracked = cot.fields.filter(schema_id__isnull=True) - for f in untracked: - diff.warnings.append( - f"Field {f.name!r} (pk={f.pk}) has no schema_id and cannot be " - "tracked by the schema diff. It will not be affected by apply operations." - ) - # ── Schema fields → ADD or ALTER ───────────────────────────────────────── for schema_id, schema_field in schema_fields.items(): if schema_id in db_fields: diff --git a/netbox_custom_objects/tests/test_comparator.py b/netbox_custom_objects/tests/test_comparator.py index 308f065..8b6d3d1 100644 --- a/netbox_custom_objects/tests/test_comparator.py +++ b/netbox_custom_objects/tests/test_comparator.py @@ -13,7 +13,7 @@ - Field ALTER: related_object_filter change - Untracked fields (no schema_id) → warning, not REMOVE - DB field absent from schema AND not tombstoned → warning, not REMOVE -- Multi-COT document +- Multi-COT document (including empty/missing types key) - has_changes / has_destructive_changes / adds / removes / alters helpers """ @@ -449,6 +449,12 @@ def test_document_diff_mixed_new_and_existing(self): self.assertFalse(existing.is_new) self.assertTrue(new_.is_new) + def test_document_missing_types_key_returns_empty(self): + self.assertEqual(diff_document({}), []) + + def test_document_empty_types_list_returns_empty(self): + self.assertEqual(diff_document({"types": []}), []) + class ComparatorHelperPropertiesTestCase(CustomObjectsTestCase, TestCase): """has_changes, has_destructive_changes, adds/removes/alters helpers.""" From 13e3f40cfc9d7a1d496e19b09084fb0f9994dbed Mon Sep 17 00:00:00 2001 From: Brian Tiemann Date: Wed, 8 Apr 2026 20:36:21 -0400 Subject: [PATCH 9/9] Address review feedback --- netbox_custom_objects/comparator.py | 28 +++++++++++++++---- .../tests/test_comparator.py | 28 +++++++++++++++++++ 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/netbox_custom_objects/comparator.py b/netbox_custom_objects/comparator.py index 9935600..1c36cf3 100644 --- a/netbox_custom_objects/comparator.py +++ b/netbox_custom_objects/comparator.py @@ -33,7 +33,10 @@ and schema.removed_fields is ambiguous (possibly added outside the workflow) and generates a warning instead. - Type changes are included in changed_attrs but are not validated here; the - executor decides whether to allow or reject them. + executor decides whether to allow or reject them. When the type changes, + type-specific attributes from the *old* DB type (e.g. validation_regex on a + text field being converted to integer) are not included in the diff — only + attributes relevant to the incoming schema type are compared. - related_object_type values are compared in their encoded schema form ("app_label/model" or "custom-objects/") so the diff output is round-trip compatible with the schema format. @@ -56,7 +59,7 @@ ) # Matches TableModel (generated model names for custom object types). -_TABLE_MODEL_RE = re.compile(r'^table(\d+)model$', re.IGNORECASE) +_TABLE_MODEL_RE = re.compile(r'^table(\d+)model$') # Ordered base attributes compared between DB and schema for each field. # Does NOT include 'name' or 'type' — those are handled separately. @@ -134,6 +137,13 @@ class COTDiff: @property def has_changes(self) -> bool: + """True if there are attribute-level or field-level changes to apply. + + Note: a brand-new COT with no fields yields ``is_new=True`` but + ``has_changes=False`` — there are no individual changes to apply, but + the COT itself must still be created. Callers should check ``is_new`` + independently when deciding whether to run a create operation. + """ return bool(self.cot_changes or self.field_changes) @property @@ -158,10 +168,12 @@ def alters(self) -> list[FieldChange]: # Internal helpers # --------------------------------------------------------------------------- -def _encode_rot(rot: "ContentType", cot_slug_cache: dict, warnings: list) -> str: +def _encode_related_object_type(rot: "ContentType", cot_slug_cache: dict, warnings: list) -> str: """ Encode a related ObjectType FK as a schema ``related_object_type`` string. - Mirrors the logic in exporter._encode_related_object_type. + Shares the same encoding logic as ``exporter._encode_related_object_type`` + but uses a pre-fetched slug cache and a warnings list rather than a live + DB query, making it safe to call in a tight loop. *cot_slug_cache* is a ``{pk: slug}`` dict pre-fetched in :func:`diff_cot` to avoid one DB query per object field. If a custom COT is referenced but @@ -240,7 +252,7 @@ def _compare_field_attrs(db_field, schema_field: dict, cot_slug_cache: dict, war if "related_object_type" in type_specific: dv = ( - _encode_rot(db_field.related_object_type, cot_slug_cache, warnings) + _encode_related_object_type(db_field.related_object_type, cot_slug_cache, warnings) ) if db_field.related_object_type_id else None sv = schema_field.get("related_object_type") if dv != sv: @@ -286,6 +298,12 @@ def diff_cot(type_def: dict) -> COTDiff: Returns a :class:`COTDiff` describing what would need to change. """ + missing = [k for k in ("slug", "name") if k not in type_def] + if missing: + raise ValueError( + f"type_def is missing required key(s) {missing}; got keys: {list(type_def)}" + ) + from netbox_custom_objects.models import CustomObjectType # noqa: PLC0415 slug = type_def["slug"] diff --git a/netbox_custom_objects/tests/test_comparator.py b/netbox_custom_objects/tests/test_comparator.py index 8b6d3d1..fcb3de0 100644 --- a/netbox_custom_objects/tests/test_comparator.py +++ b/netbox_custom_objects/tests/test_comparator.py @@ -13,6 +13,7 @@ - Field ALTER: related_object_filter change - Untracked fields (no schema_id) → warning, not REMOVE - DB field absent from schema AND not tombstoned → warning, not REMOVE +- _encode_related_object_type deleted-COT path → warning + stable fallback string - Multi-COT document (including empty/missing types key) - has_changes / has_destructive_changes / adds / removes / alters helpers """ @@ -21,6 +22,7 @@ from netbox_custom_objects.comparator import ( FieldOp, + _encode_related_object_type, diff_cot, diff_document, ) @@ -109,6 +111,14 @@ def test_new_cot_no_fields_no_field_changes(self): self.assertTrue(result.is_new) self.assertEqual(result.field_changes, []) + def test_missing_required_keys_raises_value_error(self): + with self.assertRaises(ValueError): + diff_cot({"name": "no-slug"}) + with self.assertRaises(ValueError): + diff_cot({"slug": "no-name"}) + with self.assertRaises(ValueError): + diff_cot({}) + class ComparatorCOTAttrsTestCase(CustomObjectsTestCase, TestCase): """COT-level attribute changes are captured in cot_changes.""" @@ -250,6 +260,8 @@ def _alter_field(self, field_name, **overrides): if sf["name"] == field_name: sf.update(overrides) break + else: + raise AssertionError(f"field {field_name!r} not found in exported schema") return diff_cot(type_def) def test_rename_detected(self): @@ -399,6 +411,22 @@ def test_untracked_field_not_in_field_changes(self): result = diff_cot(type_def) self.assertEqual(result.field_changes, []) + def test_encode_related_object_type_deleted_cot_emits_warning(self): + """_encode_related_object_type emits a warning and returns a stable fallback string when + the referenced CustomObjectType no longer exists (slug not in cache). + This is the only warning path in comparator.py that can't be triggered + via diff_cot because CustomObjectType.delete() removes referencing fields + before deleting the ObjectType, making the state unreachable via normal + model deletion. + """ + target = self.create_custom_object_type(name='rot-target', slug='rot-target') + rot = target.object_type # ObjectType with app_label matching constants.APP_LABEL + warnings = [] + result = _encode_related_object_type(rot, cot_slug_cache={}, warnings=warnings) + self.assertEqual(len(warnings), 1) + self.assertIn("no longer exists", warnings[0]) + self.assertIn("