diff --git a/README.md b/README.md index 1a3cf5f0..68796f72 100644 --- a/README.md +++ b/README.md @@ -32,20 +32,6 @@ $ ./manage.py migrate sudo systemctl restart netbox netbox-rq ``` -> [!NOTE] -> If you are using NetBox Custom Objects with NetBox Branching, you need to insert the following into your `configuration.py`. See the docs for a full description of how NetBox Custom Objects currently works with NetBox Branching. - -``` -PLUGINS_CONFIG = { - 'netbox_branching': { - 'exempt_models': [ - 'netbox_custom_objects.customobjecttype', - 'netbox_custom_objects.customobjecttypefield', - ], - }, -} -``` - ## Known Limitations NetBox Custom Objects is now Generally Available which means you can use it in production and migrations to future versions will work. There are many upcoming features including GraphQL support - the best place to see what's on the way is the [issues](https://github.com/netboxlabs/netbox-custom-objects/issues) list on the GitHub repository. diff --git a/netbox_custom_objects/__init__.py b/netbox_custom_objects/__init__.py index af41097c..b1ec966e 100644 --- a/netbox_custom_objects/__init__.py +++ b/netbox_custom_objects/__init__.py @@ -32,6 +32,89 @@ def _migration_finished(sender, **kwargs): _migrations_checked = None +def _patch_get_serializer_for_model(): + """ + Patch utilities.api.get_serializer_for_model to handle dynamically-generated + custom object models. + + The default implementation resolves serializers by import path convention + (e.g. netbox_custom_objects.api.serializers.Table1ModelSerializer). Dynamic + models have no importable serializer at that path, so the call raises + SerializerNotFound. This patch intercepts the lookup for APP_LABEL models and + delegates to get_serializer_class(), which generates the serializer on the fly. + """ + import utilities.api as _api_utils + + _original = _api_utils.get_serializer_for_model + + def _patched(model, prefix=''): + # Only intercept dynamically-generated custom object models (Table1Model, + # Table2Model, …) identified by their Table{n}Model name pattern. + # CustomObjectType and CustomObjectTypeField live in the same app but + # have importable serializers and must go through the normal path. + if getattr(model, '_meta', None) and model._meta.app_label == APP_LABEL \ + and extract_cot_id_from_model_name(model.__name__.lower()) is not None: + from netbox_custom_objects.api.serializers import get_serializer_class + return get_serializer_class(model) + return _original(model, prefix=prefix) + + _api_utils.get_serializer_for_model = _patched + + # Also patch the reference already imported into extras.events (and anywhere + # else that did `from utilities.api import get_serializer_for_model` before + # our patch ran). + try: + import extras.events as _extras_events + _extras_events.get_serializer_for_model = _patched + except (ImportError, AttributeError): + pass + + +def _patch_check_object_accessible_in_branch(): + """ + Patch check_object_accessible_in_branch to use an existence check instead of + a full SELECT for custom object models. + + The original implementation does model.objects.get(pk=object_id) which issues + SELECT * including every custom field column. If a field was renamed in the + branch but the stable db_column is not yet reflected in the model (e.g. due to + a stale cache), this can raise ProgrammingError. For custom objects we only + need to know whether the row exists, so filter(pk=...).exists() is sufficient + and avoids referencing any column other than the primary key. + """ + try: + import netbox_branching.signal_receivers as _sr + from netbox_branching.utilities import deactivate_branch + from netbox_branching.models import ChangeDiff + from core.choices import ObjectChangeActionChoices + from django.contrib.contenttypes.models import ContentType + + _original = _sr.check_object_accessible_in_branch + + def _patched(branch, model, object_id): + if model._meta.app_label != APP_LABEL: + return _original(branch, model, object_id) + + # Check existence in main using only the pk — avoids SELECT on + # renamed columns that may not yet exist in main. + with deactivate_branch(): + if model.objects.filter(pk=object_id).exists(): + return True + + # Not in main — was it created in this branch? + content_type = ContentType.objects.get_for_model(model) + return ChangeDiff.objects.filter( + branch=branch, + object_type=content_type, + object_id=object_id, + action=ObjectChangeActionChoices.ACTION_CREATE, + ).exists() + + _sr.check_object_accessible_in_branch = _patched + except (ImportError, AttributeError): + pass + + def _patch_object_selector_view(): """ Patch ObjectSelectorView to support dynamically-generated custom object models. @@ -183,6 +266,14 @@ def ready(self): # Patch ObjectSelectorView to support dynamically-generated custom object models _patch_object_selector_view() + # Patch get_serializer_for_model so event rules, job serializers, etc. can + # resolve serializers for dynamically-generated custom object models. + _patch_get_serializer_for_model() + + # Patch check_object_accessible_in_branch to use pk-only existence check, + # avoiding SELECT * which references renamed columns that may not exist in main. + _patch_check_object_accessible_in_branch() + # Suppress warnings about database calls during app initialization with warnings.catch_warnings(): warnings.filterwarnings( diff --git a/netbox_custom_objects/api/views.py b/netbox_custom_objects/api/views.py index 3ce1107d..56a70cc5 100644 --- a/netbox_custom_objects/api/views.py +++ b/netbox_custom_objects/api/views.py @@ -17,13 +17,8 @@ class ETagMixin: # pragma: no cover – NetBox < 4.6 shim from netbox_custom_objects.filtersets import get_filterset_class from netbox_custom_objects.models import CustomObjectType, CustomObjectTypeField -from netbox_custom_objects.utilities import is_in_branch - from . import serializers -# Constants -BRANCH_ACTIVE_ERROR_MESSAGE = _("Please switch to the main branch to perform this operation.") - class RootView(APIRootView): def get_view_name(self): @@ -77,14 +72,9 @@ def list(self, request, *args, **kwargs): return super().list(request, *args, **kwargs) def create(self, request, *args, **kwargs): - if is_in_branch(): - raise ValidationError(BRANCH_ACTIVE_ERROR_MESSAGE) return super().create(request, *args, **kwargs) def update(self, request, *args, **kwargs): - if is_in_branch(): - raise ValidationError(BRANCH_ACTIVE_ERROR_MESSAGE) - # Replicate DRF's UpdateModelMixin.update() so we can snapshot the instance # before the serializer is constructed. Calling super().update() would invoke # get_object() a second time and return a fresh, un-snapshotted instance. diff --git a/netbox_custom_objects/field_types.py b/netbox_custom_objects/field_types.py index 3b881f2c..df3ded23 100644 --- a/netbox_custom_objects/field_types.py +++ b/netbox_custom_objects/field_types.py @@ -987,11 +987,12 @@ def after_model_generation(self, instance, model, field_name): target_field.remote_field.model = to_model target_field.related_model = to_model - def create_m2m_table(self, instance, model, field_name): + def create_m2m_table(self, instance, model, field_name, schema_conn=None): """ Creates the actual M2M table after models are fully generated """ - from django.db import connection + from django.db import connection as default_connection + connection = schema_conn if schema_conn is not None else default_connection # Get the field instance field = model._meta.get_field(field_name) diff --git a/netbox_custom_objects/filtersets.py b/netbox_custom_objects/filtersets.py index 740e2075..f5cbced5 100644 --- a/netbox_custom_objects/filtersets.py +++ b/netbox_custom_objects/filtersets.py @@ -3,6 +3,7 @@ from django.contrib.postgres.fields import ArrayField from django.db.models import JSONField, Q from django.utils.dateparse import parse_date, parse_datetime +from django.utils.timezone import make_aware, is_aware from extras.choices import CustomFieldTypeChoices from netbox.filtersets import NetBoxModelFilterSet @@ -89,6 +90,8 @@ def search(self, queryset, name, value): elif field.type == CustomFieldTypeChoices.TYPE_DATETIME: parsed = parse_datetime(value) if parsed is not None: + if not is_aware(parsed): + parsed = make_aware(parsed) q |= Q(**{f"{field.name}__exact": parsed}) if not q: return queryset.none() diff --git a/netbox_custom_objects/forms.py b/netbox_custom_objects/forms.py index 0642ca34..1157c231 100644 --- a/netbox_custom_objects/forms.py +++ b/netbox_custom_objects/forms.py @@ -176,6 +176,7 @@ class CustomObjectTypeFieldForm(CustomFieldForm): class Meta: model = CustomObjectTypeField fields = "__all__" + exclude = ('db_column',) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/netbox_custom_objects/migrations/0007_fix_object_field_fk_deferrable.py b/netbox_custom_objects/migrations/0007_fix_object_field_fk_deferrable.py new file mode 100644 index 00000000..6e57b53b --- /dev/null +++ b/netbox_custom_objects/migrations/0007_fix_object_field_fk_deferrable.py @@ -0,0 +1,74 @@ +""" +Drop DEFERRABLE INITIALLY DEFERRED from FK constraints on custom object tables. + +Prior to this migration, _ensure_field_fk_constraint() created FK constraints +with DEFERRABLE INITIALLY DEFERRED. That attribute causes PostgreSQL to queue +trigger events that block subsequent ALTER TABLE calls (e.g. remove_field during +a branch revert), raising "cannot ALTER TABLE because it has pending trigger +events". + +This migration finds all DEFERRABLE FK constraints on tables whose names start +with "custom_objects_" and recreates them as non-DEFERRABLE with ON DELETE +CASCADE, matching the behaviour of the updated _ensure_field_fk_constraint(). +""" + +from django.db import migrations + + +def fix_deferrable_fk_constraints(apps, schema_editor): + """ + Re-create any DEFERRABLE FK constraints on custom object tables as + non-DEFERRABLE. Uses information_schema so no Django model loading + is required — safe to run during the migration pass even though dynamic + models are not yet registered. + """ + with schema_editor.connection.cursor() as cursor: + # Find all DEFERRABLE FK constraints on custom_objects_* tables. + cursor.execute(""" + SELECT + tc.table_name, + tc.constraint_name, + kcu.column_name, + ccu.table_name AS foreign_table_name + FROM information_schema.table_constraints AS tc + JOIN information_schema.key_column_usage AS kcu + ON tc.constraint_name = kcu.constraint_name + AND tc.table_schema = kcu.table_schema + JOIN information_schema.constraint_column_usage AS ccu + ON ccu.constraint_name = tc.constraint_name + AND ccu.table_schema = tc.table_schema + JOIN information_schema.referential_constraints AS rc + ON tc.constraint_name = rc.constraint_name + AND tc.table_schema = rc.constraint_schema + WHERE tc.constraint_type = 'FOREIGN KEY' + AND tc.table_name LIKE 'custom_objects\\_%%' + AND tc.is_deferrable = 'YES' + """) + rows = cursor.fetchall() + + for table_name, constraint_name, column_name, foreign_table in rows: + new_constraint_name = f'{table_name}_{column_name}_fk_cascade' + cursor.execute( + f'ALTER TABLE "{table_name}" DROP CONSTRAINT "{constraint_name}"' + ) + cursor.execute(f""" + ALTER TABLE "{table_name}" + ADD CONSTRAINT "{new_constraint_name}" + FOREIGN KEY ("{column_name}") + REFERENCES "{foreign_table}" ("id") + ON DELETE CASCADE + """) + + +class Migration(migrations.Migration): + + dependencies = [ + ('netbox_custom_objects', '0006_customobjecttypefield_related_name_and_more'), + ] + + operations = [ + migrations.RunPython( + fix_deferrable_fk_constraints, + migrations.RunPython.noop, + ), + ] diff --git a/netbox_custom_objects/migrations/0008_customobjecttypefield_db_column.py b/netbox_custom_objects/migrations/0008_customobjecttypefield_db_column.py new file mode 100644 index 00000000..70ff4917 --- /dev/null +++ b/netbox_custom_objects/migrations/0008_customobjecttypefield_db_column.py @@ -0,0 +1,49 @@ +""" +Add ``db_column`` to CustomObjectTypeField and back-fill it from ``name``. + +``db_column`` is frozen at field creation time so that subsequent renames are +pure metadata operations — the physical database column name never changes. +This prevents cross-schema column-name mismatches when a field is renamed in +one schema (e.g. a branch) and the model is then used to query a different +schema (e.g. main) that still has the original column name. + +The data migration sets ``db_column = name`` for all existing fields so that +``effective_db_column`` returns the same value as before the migration. +""" + +from django.db import migrations, models + + +def backfill_db_column(apps, schema_editor): + """Set db_column = name for all existing CustomObjectTypeField rows.""" + CustomObjectTypeField = apps.get_model('netbox_custom_objects', 'CustomObjectTypeField') + CustomObjectTypeField.objects.filter(db_column='').update(db_column=models.F('name')) + + +class Migration(migrations.Migration): + + dependencies = [ + ('netbox_custom_objects', '0007_fix_object_field_fk_deferrable'), + ] + + operations = [ + migrations.AddField( + model_name='customobjecttypefield', + name='db_column', + field=models.CharField( + blank=True, + default='', + help_text=( + 'Physical database column name. Set once at creation and never changed, ' + 'so renames are pure metadata changes that do not require DDL.' + ), + max_length=50, + verbose_name='database column', + ), + preserve_default=False, + ), + migrations.RunPython( + backfill_db_column, + migrations.RunPython.noop, + ), + ] diff --git a/netbox_custom_objects/models.py b/netbox_custom_objects/models.py index aabb2927..ba0c27b5 100644 --- a/netbox_custom_objects/models.py +++ b/netbox_custom_objects/models.py @@ -1,4 +1,6 @@ +import contextvars import decimal +import logging import re import threading from datetime import date, datetime @@ -58,6 +60,9 @@ from netbox_custom_objects.utilities import _suppress_clear_cache, generate_model +logger = logging.getLogger('netbox_custom_objects.models') + + class UniquenessConstraintTestError(Exception): """Custom exception used to signal successful uniqueness constraint test.""" @@ -66,6 +71,302 @@ class UniquenessConstraintTestError(Exception): USER_TABLE_DATABASE_NAME_PREFIX = "custom_objects_" +# Per-context storage for CO field values deferred during squash merge. +# Using ContextVar instead of a class-level dict so that concurrent merges +# (different threads or coroutines) each get an isolated copy. +# Shape: {db_table: {co_pk: {'using': alias, 'data': {field_name: value}}}} +_deferred_co_field_data: contextvars.ContextVar[dict | None] = contextvars.ContextVar( + '_deferred_co_field_data', default=None +) + + +def _get_schema_connection(): + """ + Return the active branch's DB connection when called within a branch context, + otherwise return the default (main-schema) connection. + + Used so that schema-editor operations (add/alter/remove column) target the + correct PostgreSQL schema without requiring every call-site to be branch-aware. + """ + try: + from netbox_branching.contextvars import active_branch + branch = active_branch.get() + if branch is not None: + from django.db import connections + return connections[branch.connection_name] + except ImportError: + pass + return connection + + +def _apply_deferred_co_field(field_instance): + """ + Apply any deferred CO field values after a column is added to the DB. + + Called by CustomObjectTypeField.save() after schema_editor.add_field() so that + custom object rows inserted before their columns existed (squash merge ordering) + receive their correct values via a raw UPDATE. + + ``_deferred_co_field_data`` (ContextVar) has the shape:: + + {db_table: {co_pk: {'using': alias, 'data': {field_name: value}}}} + + For TYPE_OBJECT fields the postchange_data key is ``{name}`` but the DB column + is ``{name}_id`` — this function maps accordingly. + For TYPE_MULTIOBJECT fields there is no column on the main table, so they are + skipped entirely. + """ + from extras.choices import CustomFieldTypeChoices + + # No deferred data at all — fast path. + deferred = _deferred_co_field_data.get() + if not deferred: + return + + cot = field_instance.custom_object_type + table_name = cot.get_database_table_name() + per_table = deferred.get(table_name) + if not per_table: + return + + # M2M has no column on the main table — nothing to UPDATE. + if field_instance.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT: + return + + # For TYPE_OBJECT the data key is the field name but the DB column ends with _id. + data_key = field_instance.name + if field_instance.type == CustomFieldTypeChoices.TYPE_OBJECT: + col_name = f'{field_instance.name}_id' + else: + col_name = field_instance.name + + schema_conn = _get_schema_connection() + + with schema_conn.cursor() as cursor: + for co_pk, entry in per_table.items(): + value = entry['data'].get(data_key) + if value is None: + continue + # table_name comes from get_database_table_name() (controlled by our + # code) and col_name from field.name, which is validated by the + # ^[a-z0-9_]+$ regex — no double-quote characters are possible, so + # the f-string interpolation is safe against SQL injection here. + cursor.execute( + f'UPDATE "{table_name}" SET "{col_name}" = %s WHERE id = %s', + [value, co_pk], + ) + + # Remove the consumed key from each entry so that processed field data does + # not persist in the ContextVar beyond its useful lifetime (e.g. on a retry + # after a partial failure, stale data from a previous attempt is avoided). + for entry in per_table.values(): + entry['data'].pop(data_key, None) + + # Prune entries whose data dict is now exhausted. + exhausted = [pk for pk, entry in per_table.items() if not entry['data']] + for pk in exhausted: + del per_table[pk] + if not per_table: + del deferred[table_name] + if not deferred: + _deferred_co_field_data.set(None) + + +def _schema_add_field(fi, model, schema_editor, schema_conn): + """ + Issue ``add_field`` against the physical schema for *fi*. + + Handles through-table creation for MULTIOBJECT fields. Does NOT apply + deferred CO field data — callers that need that (squash merge context) must + call ``_apply_deferred_co_field(fi)`` separately after this returns. + + Idempotent: skips the ALTER TABLE if the column already exists (e.g. when + sync/merge replays an ObjectChange that was already applied). + """ + ft = FIELD_TYPE_CLASS[fi.type]() + mf = ft.get_model_field(fi, db_column=fi.effective_db_column) + mf.contribute_to_class(model, fi.name) + + with schema_conn.cursor() as cursor: + existing_cols = { + col.name + for col in schema_conn.introspection.get_table_description(cursor, model._meta.db_table) + } + if mf.column in existing_cols: + logger.debug('_schema_add_field: %r already exists on %s, skipping', mf.column, model._meta.db_table) + return + + schema_editor.add_field(model, mf) + if fi.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT: + ft.create_m2m_table(fi, model, fi.name, schema_conn=schema_conn) + + +def _schema_remove_field(fi, model, schema_editor, existing_tables=None): + """ + Issue ``remove_field`` against the physical schema for *fi*. + + For MULTIOBJECT fields the through table is dropped first. When + *existing_tables* is a pre-fetched list only tables present in it are + dropped; when it is ``None`` (main-schema context) the drop is always + attempted. + + Always issues ``SET CONSTRAINTS ALL IMMEDIATE`` before ``remove_field`` to + flush any DEFERRABLE FK trigger events that would otherwise cause PostgreSQL + to reject the subsequent ALTER TABLE. + """ + ft = FIELD_TYPE_CLASS[fi.type]() + mf = ft.get_model_field(fi, db_column=fi.effective_db_column) + mf.contribute_to_class(model, fi.name) + + if fi.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT: + through_table = fi.through_table_name + if existing_tables is None or through_table in existing_tables: + through_meta = type( + 'Meta', (), + {'db_table': through_table, 'app_label': APP_LABEL, 'managed': True}, + ) + through_model = type( + f'_TempThrough_{through_table}', + (models.Model,), + {'Meta': through_meta, '__module__': 'netbox_custom_objects.models'}, + ) + schema_editor.delete_model(through_model) + + # Flush any pending DEFERRABLE FK trigger events before ALTER TABLE; + # otherwise PostgreSQL raises "pending trigger events" when removing a FK field. + schema_editor.execute('SET CONSTRAINTS ALL IMMEDIATE') + schema_editor.remove_field(model, mf) + + +def _schema_alter_field(old_fi, new_fi, model, schema_editor, schema_conn, existing_tables=None): + """ + Issue ``alter_field`` against the physical schema, updating *old_fi* to *new_fi*. + + For MULTIOBJECT fields whose name changes the through table is renamed before + ``alter_field`` is called. When the old through table is absent (e.g. the + branch has never seen this field) the new through table is created from scratch + instead. + + *existing_tables* — optional pre-fetched table name list from the target + connection. When given, through-table operations are guarded by membership + checks. When ``None`` (main-schema context) the schema_conn is introspected + once on demand. + + Idempotent: skips the ALTER TABLE if the old column is already gone and the + new column already exists (e.g. when sync/merge replays an ObjectChange that + was already applied). + + Conflict resolution: when neither the old nor the new column exists (the field + was independently renamed in the target schema — e.g. branch renamed A→X while + main renamed A→Y), the live field record is looked up from the target schema to + find the actual current column name, which is then renamed to the new target. + A warning is logged to flag the conflict. + """ + old_is_m2m = old_fi.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT + new_is_m2m = new_fi.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT + + # A type change between MULTIOBJECT and a scalar type (or vice versa) is not + # a simple column rename/alter — the storage representation is fundamentally + # different (through-table vs column). Attempting alter_field in this case + # would fail at the DB level. Log and skip; the caller is expected to handle + # such changes as remove + add rather than alter. + if old_is_m2m != new_is_m2m: + logger.warning( + '_schema_alter_field: skipping unsupported type change %r→%r on %s ' + '(MULTIOBJECT ↔ scalar changes require remove+add, not alter)', + old_fi.type, new_fi.type, model._meta.db_table, + ) + return + + old_mf = FIELD_TYPE_CLASS[old_fi.type]().get_model_field(old_fi, db_column=old_fi.effective_db_column) + new_mf = FIELD_TYPE_CLASS[new_fi.type]().get_model_field(new_fi, db_column=new_fi.effective_db_column) + old_mf.contribute_to_class(model, old_fi.name) + new_mf.contribute_to_class(model, new_fi.name) + + with schema_conn.cursor() as cursor: + existing_cols = { + col.name + for col in schema_conn.introspection.get_table_description(cursor, model._meta.db_table) + } + if old_mf.column not in existing_cols: + if new_mf.column in existing_cols: + logger.debug( + '_schema_alter_field: %r already renamed to %r on %s, skipping', + old_mf.column, new_mf.column, model._meta.db_table, + ) + return + if old_is_m2m: + # M2M fields have no physical column; the old through table is absent. + return + # Scalar field: neither the old nor the new column exists. The field was + # independently renamed in this schema (e.g. branch renamed A→X while main + # renamed A→Y; now applying main's rename to the branch). Look up the live + # field record in the target schema to find the actual column and rename it. + logger.warning( + '_schema_alter_field: rename conflict on %s — source column %r and ' + 'target column %r are both absent; field pk=%d was independently renamed ' + 'in this schema; resolving by looking up live column', + model._meta.db_table, old_mf.column, new_mf.column, new_fi.pk, + ) + try: + live_fi = CustomObjectTypeField.objects.using(schema_conn.alias).get(pk=new_fi.pk) + except CustomObjectTypeField.DoesNotExist: + logger.debug( + '_schema_alter_field: field pk=%d not found in %s; skipping', + new_fi.pk, schema_conn.alias, + ) + return + live_mf = FIELD_TYPE_CLASS[live_fi.type]().get_model_field(live_fi, db_column=live_fi.effective_db_column) + live_mf.contribute_to_class(model, live_fi.name) + if live_mf.column not in existing_cols: + logger.debug( + '_schema_alter_field: live column %r also absent on %s; skipping', + live_mf.column, model._meta.db_table, + ) + return + schema_editor.alter_field(model, live_mf, new_mf) + return + + if ( + new_is_m2m + and old_fi.name != new_fi.name + ): + old_through = old_fi.through_table_name + new_through = new_fi.through_table_name + + tables = existing_tables + if tables is None: + with schema_conn.cursor() as cursor: + tables = schema_conn.introspection.table_names(cursor) + + if old_through in tables: + old_through_meta = type( + 'Meta', (), + {'db_table': old_through, 'app_label': APP_LABEL, 'managed': True}, + ) + old_through_model = generate_model( + f'_TempOldThrough_{old_through}', + (models.Model,), + { + '__module__': 'netbox_custom_objects.models', + 'Meta': old_through_meta, + 'id': models.AutoField(primary_key=True), + 'source': models.ForeignKey( + model, on_delete=models.CASCADE, db_column='source_id', related_name='+', + ), + 'target': models.ForeignKey( + model, on_delete=models.CASCADE, db_column='target_id', related_name='+', + ), + }, + ) + schema_editor.alter_db_table(old_through_model, old_through, new_through) + else: + # Old through table absent — create the new one from scratch + ft = FIELD_TYPE_CLASS[new_fi.type]() + ft.create_m2m_table(new_fi, model, new_fi.name, schema_conn=schema_conn) + + schema_editor.alter_field(model, old_mf, new_mf) + class CustomObject( BookmarksMixin, @@ -99,6 +400,73 @@ class CustomObject( class Meta: abstract = True + @classmethod + def deserialize_object(cls, data, pk=None): + """ + Hook called by ObjectChange.apply() for CREATE actions. + + The squash merge strategy may apply a CO's CREATE before its + CustomObjectTypeField rows are in main (the dependency graph has no FK + from CO to fields). When that happens, the standard Django + deserialization would INSERT the CO with NULL custom-field values + because the columns don't exist yet. + + This hook: + 1. Deserializes the CO using a fresh model (re-queried from DB). + 2. Does save_base(raw=True) as normal. + 3. Stores the full postchange_data in _deferred_co_field_data (ContextVar) + so that CustomObjectTypeField.save() can UPDATE the row after each + column is added (handles the squash ordering case). + """ + from utilities.serialization import deserialize_object as _deserialize + from netbox_custom_objects.utilities import extract_cot_id_from_model_name + + # Derive the COT primary key from the model class name (e.g. 'Table1Model' → 1) + cot_id_str = extract_cot_id_from_model_name(cls.__name__.lower()) + if cot_id_str is None: + # Not a generated model name — fall back to standard deserialization. + return _deserialize(cls, data, pk=pk) + cot_id = int(cot_id_str) # regex guarantees digits-only + + # Refresh the model cache so we pick up any fields already applied to main. + # (In the squash case the cache may still point to a zero-field model.) + from netbox_custom_objects.models import CustomObjectType as _COT # noqa: F401 + _COT.clear_model_cache(cot_id) + try: + cot = _COT.objects.get(pk=cot_id) + fresh_model = cot.get_model() + except _COT.DoesNotExist: + fresh_model = cls + + inner = _deserialize(fresh_model, data, pk=pk) + obj = inner.object + table_name = fresh_model._meta.db_table + full_data = dict(data) + + class _Deserialized: + object = obj + + def save(self, using=None, **_kwargs): + from django.db import DEFAULT_DB_ALIAS + _using = using or DEFAULT_DB_ALIAS + models.Model.save_base(obj, using=_using, raw=True) + # Read pk after save_base so that auto-assigned PKs are captured. + # (If pk was None before save_base, obj.pk is now the DB-assigned id.) + obj_pk = obj.pk + # Register full data for deferred column updates (squash ordering fix). + deferred = _deferred_co_field_data.get() + if deferred is None: + deferred = {} + _deferred_co_field_data.set(deferred) + if table_name not in deferred: + deferred[table_name] = {} + deferred[table_name][obj_pk] = { + 'using': _using, + 'data': full_data, + } + + return _Deserialized() + def __str__(self): # Find the field with primary=True and return that field's "name" as the name of the object primary_field = self._field_objects.get(self._primary_field_id, None) @@ -238,6 +606,9 @@ def __str__(self): return self.display_name def clean(self): + # Guard against None (can arrive via update_object during branch revert) + if self.custom_field_data is None: + self.custom_field_data = {} super().clean() if not self.slug: @@ -337,6 +708,13 @@ def get_cached_through_models(cls, custom_object_type_id): """ return cls._through_model_cache.get(custom_object_type_id, {}) + def serialize_object(self, exclude=None): + # cache_timestamp is an internal cache-invalidation field; exclude it + # from ObjectChange records so it doesn't appear as a tracked change. + extra = ['cache_timestamp'] + combined = list(exclude or []) + extra + return super().serialize_object(exclude=combined) + def get_absolute_url(self): return reverse("plugins:netbox_custom_objects:customobjecttype", args=[self.pk]) @@ -379,7 +757,7 @@ def _fetch_and_generate_field_attrs( field_name = field.name field_attrs[field.name] = field_type.get_model_field( - field, + field, db_column=field.effective_db_column, ) # Add to field objects only if the field was successfully generated @@ -654,7 +1032,7 @@ def _ensure_field_fk_constraint(self, model, field_name): related_table = related_model._meta.db_table column_name = model_field.column - with connection.cursor() as cursor: + with _get_schema_connection().cursor() as cursor: # Drop existing FK constraint if it exists # Query for existing constraints cursor.execute(""" @@ -669,7 +1047,9 @@ def _ensure_field_fk_constraint(self, model, field_name): constraint_name = row[0] cursor.execute(f'ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS "{constraint_name}"') - # Create new FK constraint with ON DELETE CASCADE + # Create new FK constraint with ON DELETE CASCADE. + # Not DEFERRABLE: a deferred constraint leaves pending trigger events that block + # subsequent ALTER TABLE calls (e.g. during branch revert remove_field). constraint_name = f"{table_name}_{column_name}_fk_cascade" cursor.execute(f""" ALTER TABLE "{table_name}" @@ -677,7 +1057,6 @@ def _ensure_field_fk_constraint(self, model, field_name): FOREIGN KEY ("{column_name}") REFERENCES "{related_table}" ("id") ON DELETE CASCADE - DEFERRABLE INITIALLY DEFERRED """) def _ensure_all_fk_constraints(self, model): @@ -700,18 +1079,61 @@ def create_model(self): # Ensure the ContentType exists and is immediately available features = get_model_features(model) - if 'branching' in features: - features.remove('branching') self.object_type.features = features self.object_type.public = True self.object_type.save() - with connection.schema_editor() as schema_editor: + with _get_schema_connection().schema_editor() as schema_editor: schema_editor.create_model(model) get_serializer_class(model) self.register_custom_object_search_index(model) + @classmethod + def deserialize_object(cls, data, pk=None): + """ + Custom deserialization hook for netbox-branching's merge/revert engine. + + ``ObjectChange.apply()`` normally uses ``DeserializedObject.save()``, which + calls ``Model.save_base(raw=True)`` — bypassing our ``save()`` override and + all ``post_save`` signals. That means ``create_model()`` never runs and the + physical table is never created in the destination schema. + + By implementing this classmethod the apply engine calls our version instead, + returning a wrapper whose ``save()`` invokes the full ``CustomObjectType.save()`` + lifecycle (signals included) so that the table is created as a side effect of + replaying the ObjectChange. + + ``object_type`` is cleared before saving so the ``custom_object_type_post_save_handler`` + can re-create and link it correctly in the destination schema, avoiding any FK + mismatch between the branch and main ``ObjectType`` pks. + """ + from utilities.serialization import deserialize_object as _deserialize + + inner = _deserialize(cls, data, pk=pk) + + class _SchemaAwareDeserialized: + def __init__(self, deserialized): + self._inner = deserialized + self.object = deserialized.object + + def save(self, using=None, **kwargs): + # Snapshot before modifying so that diff()['pre'] records the + # current state rather than showing all fields as None on revert. + self.object.snapshot() + # Clear the ObjectType FK — it may not exist in main yet. + # custom_object_type_post_save_handler re-sets it after INSERT. + self.object.object_type = None + self.object.object_type_id = None + self.object.save() + # Re-apply any M2M data (tags, etc.) that was stripped during deserialization. + if self._inner.m2m_data: + for accessor_name, object_list in self._inner.m2m_data.items(): + getattr(self.object, accessor_name).set(object_list) + self._inner.m2m_data = None + + return _SchemaAwareDeserialized(inner) + def save(self, *args, **kwargs): needs_db_create = self._state.adding @@ -728,20 +1150,41 @@ def delete(self, *args, **kwargs): self.clear_model_cache(self.id) model = self.get_model() + schema_conn = _get_schema_connection() + in_branch = schema_conn is not connection # Delete all CustomObjectTypeFields that reference this CustomObjectType for field in CustomObjectTypeField.objects.filter(related_object_type=self.object_type): field.delete() object_type = ObjectType.objects.get_for_model(model) - ObjectChange.objects.filter(changed_object_type=object_type).delete() + + # ObjectChange and ObjectType records live in the main schema. Only clean + # them up when operating outside a branch; inside a branch they belong to + # main and must not be touched until the deletion is merged. + if not in_branch: + ObjectChange.objects.filter(changed_object_type=object_type).delete() + super().delete(*args, **kwargs) - # Temporarily disconnect the pre_delete handler to skip the ObjectType deletion - # TODO: Remove this disconnect/reconnect after ObjectType has been exempted from handle_deleted_object - pre_delete.disconnect(handle_deleted_object) - object_type.delete() - with connection.schema_editor() as schema_editor: + if not in_branch: + # ChangeDiff has a PROTECT FK to ContentType/ObjectType — delete those + # records first so object_type.delete() is not blocked. + try: + from netbox_branching.models import ChangeDiff + ChangeDiff.objects.filter(object_type=object_type).delete() + except ImportError: + pass + # Temporarily disconnect the pre_delete handler to skip the ObjectType deletion + # TODO: Remove this disconnect/reconnect after ObjectType has been exempted from handle_deleted_object + pre_delete.disconnect(handle_deleted_object) + object_type.delete() + pre_delete.connect(handle_deleted_object) + + with schema_conn.schema_editor() as schema_editor: + # Drop through tables before the main table (they have FKs pointing to it). + for through_model in getattr(model, '_through_models', []): + schema_editor.delete_model(through_model) schema_editor.delete_model(model) # Unregister the model and its through-models from Django's app registry so @@ -767,9 +1210,6 @@ def delete(self, *args, **kwargs): # Re-clear the model cache to remove re-cached model from get_model. self.clear_model_cache(self.id) - # Reconnect the pre_delete handler after all cleanup is done. - pre_delete.connect(handle_deleted_object) - @receiver(post_save, sender=CustomObjectType) def custom_object_type_post_save_handler(sender, instance, created, **kwargs): @@ -780,10 +1220,66 @@ def custom_object_type_post_save_handler(sender, instance, created, **kwargs): app_label=APP_LABEL, model=content_type_name ) + # Snapshot before modifying so change logging records a correct pre-state. + # Without this, diff()['pre'] would set all fields to None during branch revert. + instance.snapshot() instance.object_type = ct instance.save() +def _rename_objectchange_field_key(fi, old_name, new_name): + """ + Rename a JSON key in all ObjectChange records for CustomObject instances of + this field's type, reflecting a field rename from *old_name* to *new_name*. + + Updates both ``prechange_data`` and ``postchange_data`` in the ObjectChange + table, and ``original``/``modified``/``current`` in netbox-branching's + ChangeDiff table when that plugin is installed. + + Field names are validated with ``^[a-z0-9_]+$`` so string formatting of the + column names here is safe against SQL injection. + + This runs inside the same ``transaction.atomic()`` block as + ``CustomObjectTypeField.save()``, so it rolls back cleanly if the enclosing + transaction is aborted. + """ + from django.db import connections + + cot = fi.custom_object_type + model = cot.get_model() + ct = ContentType.objects.get_for_model(model) + conn = _get_schema_connection() + + oc_sql = ( + 'UPDATE core_objectchange ' + 'SET {col} = ({col} - %s) || jsonb_build_object(%s, {col}->%s) ' + 'WHERE changed_object_type_id = %s AND {col} ? %s' + ) + with connections[conn.alias].cursor() as cursor: + for json_col in ('prechange_data', 'postchange_data'): + cursor.execute(oc_sql.format(col=json_col), [old_name, new_name, old_name, ct.id, old_name]) + + logger.debug('_rename_objectchange_field_key: %r → %r for %s', old_name, new_name, ct) + + try: + from netbox_branching.models import ChangeDiff # noqa: F401 — presence check only + cd_sql = ( + 'UPDATE netbox_branching_changediff ' + 'SET {col} = ({col} - %s) || jsonb_build_object(%s, {col}->%s) ' + 'WHERE object_type_id = %s AND {col} IS NOT NULL AND {col} ? %s' + ) + with connections[conn.alias].cursor() as cursor: + for json_col in ('original', 'modified', 'current'): + cursor.execute(cd_sql.format(col=json_col), [old_name, new_name, old_name, ct.id, old_name]) + except ImportError: + pass # netbox-branching not installed + except Exception: + logger.debug( + '_rename_objectchange_field_key: ChangeDiff rename failed for %r → %r', + old_name, new_name, exc_info=True, + ) + + class CustomObjectTypeField(CloningMixin, ExportTemplatesMixin, ChangeLoggedModel): custom_object_type = models.ForeignKey( CustomObjectType, on_delete=models.CASCADE, related_name="fields" @@ -835,6 +1331,15 @@ class CustomObjectTypeField(CloningMixin, ExportTemplatesMixin, ChangeLoggedMode ), ), ) + db_column = models.CharField( + verbose_name=_("database column"), + max_length=50, + blank=True, + help_text=_( + "Physical database column name. Set once at creation and never changed, " + "so renames are pure metadata changes that do not require DDL." + ), + ) label = models.CharField( verbose_name=_("label"), max_length=50, @@ -1028,6 +1533,16 @@ def is_single_value(self): def many(self): return self.type in ["multiobject"] + @property + def effective_db_column(self): + """ + Return the physical database column name for this field. + + ``db_column`` is frozen at creation time so that renames are pure + metadata operations — the physical column name never changes. + """ + return self.db_column + def get_child_relations(self, instance): return instance.get_field_value(self) @@ -1142,19 +1657,21 @@ def clean(self): {"unique": _("Uniqueness cannot be enforced for boolean or multiobject fields")} ) - # Check if uniqueness constraint can be applied when changing from non-unique to unique + # Check if uniqueness constraint can be applied when changing from non-unique to unique. + # Skip when _original is absent (e.g. during deserialization in branch merge/revert). if ( self.pk and self.unique - and not self.original.unique and not self._state.adding + and hasattr(self, '_original') + and not self.original.unique ): field_type = FIELD_TYPE_CLASS[self.type]() - model_field = field_type.get_model_field(self) + model_field = field_type.get_model_field(self, db_column=self.effective_db_column) model = self.custom_object_type.get_model() model_field.contribute_to_class(model, self.name) - old_field = field_type.get_model_field(self.original) + old_field = field_type.get_model_field(self.original, db_column=self.original.effective_db_column) old_field.contribute_to_class(model, self._original_name) try: @@ -1625,116 +2142,62 @@ def through_table_name(self): def through_model_name(self): return f"Through_{self.through_table_name}" + @classmethod + def deserialize_object(cls, data, pk=None): + """ + Custom deserialization hook for netbox-branching's merge/revert engine. + + Same problem as ``CustomObjectType.deserialize_object``: the default + ``DeserializedObject.save(raw=True)`` bypasses ``CustomObjectTypeField.save()``, + so the physical column is never added to the custom object table. + + This wrapper calls the real ``save()`` so that ``add_field`` runs as a side + effect of replaying the CREATE ObjectChange during a merge. + """ + from utilities.serialization import deserialize_object as _deserialize + + inner = _deserialize(cls, data, pk=pk) + + class _SchemaAwareDeserialized: + def __init__(self, deserialized): + self._inner = deserialized + self.object = deserialized.object + + def save(self, using=None, **kwargs): + self.object.save() + if self._inner.m2m_data: + for accessor_name, object_list in self._inner.m2m_data.items(): + getattr(self.object, accessor_name).set(object_list) + self._inner.m2m_data = None + + return _SchemaAwareDeserialized(inner) + def save(self, *args, **kwargs): is_new = self._state.adding - field_type = FIELD_TYPE_CLASS[self.type]() - model_field = field_type.get_model_field(self) - model = self.custom_object_type.get_model() - model_field.contribute_to_class(model, self.name) - with connection.schema_editor() as schema_editor: - if self._state.adding: - schema_editor.add_field(model, model_field) - if self.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT: - field_type.create_m2m_table(self, model, self.name) - else: - old_field = field_type.get_model_field(self.original) - old_field.contribute_to_class(model, self._original_name) + # Freeze the physical column name at creation. db_column is set once + # here and never updated, so subsequent renames only update the ORM + # field name — no DDL is required for renames. + if self._state.adding and not self.db_column: + self.db_column = self.name - # Special handling for MultiObject fields when the name changes - if ( - self.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT - and self.name != self._original_name - ): - # For renamed MultiObject fields, we just need to rename the through table - old_through_table_name = self.original.through_table_name - new_through_table_name = self.through_table_name - - # Check if old through table exists - with connection.cursor() as cursor: - tables = connection.introspection.table_names(cursor) - old_table_exists = old_through_table_name in tables - - if old_table_exists: - # Create temporary models to represent the old and new through table states - old_through_meta = type( - "Meta", - (), - { - "db_table": old_through_table_name, - "app_label": APP_LABEL, - "managed": True, - }, - ) - old_through_model = generate_model( - f"TempOld{self.original.through_model_name}", - (models.Model,), - { - "__module__": "netbox_custom_objects.models", - "Meta": old_through_meta, - "id": models.AutoField(primary_key=True), - "source": models.ForeignKey( - model, - on_delete=models.CASCADE, - db_column="source_id", - related_name="+", - ), - "target": models.ForeignKey( - model, - on_delete=models.CASCADE, - db_column="target_id", - related_name="+", - ), - }, - ) + # Use the branch connection when operating inside a branch so that schema + # editor operations target the branch schema rather than main. + schema_conn = _get_schema_connection() - new_through_meta = type( - "Meta", - (), - { - "db_table": new_through_table_name, - "app_label": APP_LABEL, - "managed": True, - }, - ) - new_through_model = generate_model( - f"TempNew{self.through_model_name}", - (models.Model,), - { - "__module__": "netbox_custom_objects.models", - "Meta": new_through_meta, - "id": models.AutoField(primary_key=True), - "source": models.ForeignKey( - model, - on_delete=models.CASCADE, - db_column="source_id", - related_name="+", - ), - "target": models.ForeignKey( - model, - on_delete=models.CASCADE, - db_column="target_id", - related_name="+", - ), - }, - ) - new_through_model # To silence ruff error + model = self.custom_object_type.get_model() - # Rename the table using Django's schema editor - schema_editor.alter_db_table( - old_through_model, - old_through_table_name, - new_through_table_name, - ) - else: - # No old table exists, create the new through table - field_type.create_m2m_table(self, model, self.name) + with schema_conn.schema_editor() as schema_editor: + if self._state.adding: + _schema_add_field(self, model, schema_editor, schema_conn) + _apply_deferred_co_field(self) + else: + _schema_alter_field(self.original, self, model, schema_editor, schema_conn) - # Alter the field normally (this updates the field definition) - schema_editor.alter_field(model, old_field, model_field) - else: - # Normal field alteration - schema_editor.alter_field(model, old_field, model_field) + # When the field is renamed, update ObjectChange / ChangeDiff JSON keys so + # historical audit records and branch diffs stay consistent with the new name. + if not self._state.adding and self._original_name != self.name: + _rename_objectchange_field_key(self, self._original_name, self.name) # Ensure FK constraints are properly created for OBJECT fields with CASCADE behavior should_ensure_fk = False @@ -1756,7 +2219,10 @@ def save(self, *args, **kwargs): # Clear and refresh the model cache for this CustomObjectType when a field is modified self.custom_object_type.clear_model_cache(self.custom_object_type.id) - # Update parent's cache_timestamp to invalidate cache across all workers + # Update parent's cache_timestamp to invalidate cache across all workers. + # snapshot() must be called first so that change logging has a correct pre-state; + # without it, diff()['pre'] would set ALL fields to None during branch revert. + self.custom_object_type.snapshot() self.custom_object_type.save(update_fields=['cache_timestamp']) super().save(*args, **kwargs) @@ -1768,8 +2234,18 @@ def ensure_constraint(): transaction.on_commit(ensure_constraint) + # Regenerate the model from DB now that the field change is persisted. + # _schema_alter_field adds both old and new field names to the model + # class via contribute_to_class, and something between clear_model_cache + # and super().save() (e.g. a post_save signal on the COT) can call + # get_model() while the DB still has the old name, caching a stale model + # without the new name. Forcing a no_cache regeneration here (after + # super().save() committed the new name) ensures apps.all_models holds a + # clean model with exactly the current DB field list. + updated_model = self.custom_object_type.get_model(no_cache=True) + # Reregister SearchIndex with new set of searchable fields - self.custom_object_type.register_custom_object_search_index(model) + self.custom_object_type.register_custom_object_search_index(updated_model) # Reindex all objects of this type if search indexing was affected if is_new: @@ -1781,28 +2257,33 @@ def ensure_constraint(): transaction.on_commit(lambda: ReindexCustomObjectTypeJob.enqueue(cot_id=_cot_id)) def delete(self, *args, **kwargs): - field_type = FIELD_TYPE_CLASS[self.type]() - model_field = field_type.get_model_field(self) + # Use the branch connection when operating inside a branch. + schema_conn = _get_schema_connection() + model = self.custom_object_type.get_model() - model_field.contribute_to_class(model, self.name) - with connection.schema_editor() as schema_editor: - if self.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT: - apps = model._meta.apps - through_model = apps.get_model(APP_LABEL, self.through_model_name) - schema_editor.delete_model(through_model) - schema_editor.remove_field(model, model_field) + with schema_conn.schema_editor() as schema_editor: + _schema_remove_field(self, model, schema_editor) # Clear the model cache for this CustomObjectType when a field is deleted self.custom_object_type.clear_model_cache(self.custom_object_type.id) - # Update parent's cache_timestamp to invalidate cache across all workers + # Update parent's cache_timestamp to invalidate cache across all workers. + # snapshot() must be called first so that change logging has a correct pre-state. + self.custom_object_type.snapshot() self.custom_object_type.save(update_fields=['cache_timestamp']) super().delete(*args, **kwargs) + # Regenerate and re-register the model so the app registry no longer includes + # the removed field. During squash revert the squash strategy may try to query + # CO rows (model.objects.get(pk=...)) after undoing this field but before undoing + # the CO itself. If the stale model class is still in the app registry it will + # include the now-absent column in its SELECT, causing ProgrammingError. + updated_model = self.custom_object_type.get_model() + # Reregister SearchIndex with new set of searchable fields - self.custom_object_type.register_custom_object_search_index(model) + self.custom_object_type.register_custom_object_search_index(updated_model) # Reindex all objects of this type since a searchable field was removed if self.search_weight > 0: diff --git a/netbox_custom_objects/templates/netbox_custom_objects/custom_object_bulk_edit.html b/netbox_custom_objects/templates/netbox_custom_objects/custom_object_bulk_edit.html index 2ed1a515..7371ebec 100644 --- a/netbox_custom_objects/templates/netbox_custom_objects/custom_object_bulk_edit.html +++ b/netbox_custom_objects/templates/netbox_custom_objects/custom_object_bulk_edit.html @@ -9,10 +9,6 @@ {# Edit form #}
- {% if branch_warning %} - {% include 'netbox_custom_objects/inc/branch_warning.html' %} - {% endif %} -
{% csrf_token %} @@ -85,7 +81,7 @@

{% trans "Comments" %}

{% trans "Cancel" %} - +
diff --git a/netbox_custom_objects/templates/netbox_custom_objects/custom_object_bulk_import.html b/netbox_custom_objects/templates/netbox_custom_objects/custom_object_bulk_import.html index 3ae447b5..8bce96e7 100644 --- a/netbox_custom_objects/templates/netbox_custom_objects/custom_object_bulk_import.html +++ b/netbox_custom_objects/templates/netbox_custom_objects/custom_object_bulk_import.html @@ -10,10 +10,6 @@
- {% if branch_warning %} - {% include 'netbox_custom_objects/inc/branch_warning.html' %} - {% endif %} -
{% csrf_token %} @@ -36,7 +32,7 @@ {% if return_url %} {% trans "Cancel" %} {% endif %} - +
@@ -68,7 +64,7 @@ {% if return_url %} {% trans "Cancel" %} {% endif %} - +
@@ -101,7 +97,7 @@ {% if return_url %} {% trans "Cancel" %} {% endif %} - + diff --git a/netbox_custom_objects/templates/netbox_custom_objects/customobject_edit.html b/netbox_custom_objects/templates/netbox_custom_objects/customobject_edit.html index 6cba146a..fdf82574 100644 --- a/netbox_custom_objects/templates/netbox_custom_objects/customobject_edit.html +++ b/netbox_custom_objects/templates/netbox_custom_objects/customobject_edit.html @@ -13,10 +13,6 @@ {% endblock title %} {% block form %} - {% if branch_warning %} - {% include 'netbox_custom_objects/inc/branch_warning.html' %} - {% endif %} - {# Render hidden fields #} {% for field in form.hidden_fields %} {{ field }} @@ -50,15 +46,15 @@

{{ group }}

{% block buttons %} {% trans "Cancel" %} {% if object.pk %} - {% else %}
- -
diff --git a/netbox_custom_objects/templates/netbox_custom_objects/inc/branch_warning.html b/netbox_custom_objects/templates/netbox_custom_objects/inc/branch_warning.html deleted file mode 100644 index ed7d0274..00000000 --- a/netbox_custom_objects/templates/netbox_custom_objects/inc/branch_warning.html +++ /dev/null @@ -1,12 +0,0 @@ -{% load i18n %} - - diff --git a/netbox_custom_objects/tests/base.py b/netbox_custom_objects/tests/base.py index ae48fc57..e5c052d2 100644 --- a/netbox_custom_objects/tests/base.py +++ b/netbox_custom_objects/tests/base.py @@ -1,12 +1,55 @@ # Test utilities for netbox_custom_objects plugin +from django.apps import apps as django_apps +from django.contrib.contenttypes.management import create_contenttypes from django.test import Client from core.models import ObjectType from extras.models import CustomFieldChoiceSet from utilities.testing import create_test_user +from netbox_custom_objects.constants import APP_LABEL from netbox_custom_objects.models import CustomObjectType, CustomObjectTypeField +def _recreate_contenttypes(): + """Recreate ContentType rows for all installed apps using get_or_create. + + Called after a TransactionTestCase flush so that subsequent test classes — + whether TransactionTestCase or regular TestCase — can look up ContentTypes. + Using create_contenttypes (get_or_create) avoids the duplicate-key + violations that serialized_rollback causes in the parallel test runner. + """ + for app_config in django_apps.get_app_configs(): + create_contenttypes(app_config, verbosity=0) + + +def _purge_stale_generated_models(): + """Remove dynamically generated CustomObject models from the app registry. + + Regular TestCase subclasses wrap each test in a transaction that is rolled + back at the end. Rolling back the transaction drops any tables created by + DDL inside the test (e.g. CREATE TABLE custom_objects_1), but Django's + in-memory model registry is NOT rolled back. The stale model entry then + causes problems for subsequent TransactionTestCase tests: + + - netbox-branching's get_tables_to_replicate() iterates over registered + models to build the list of tables to clone into the branch schema. + A stale model entry makes it try to COPY a table that no longer exists. + - Django's cascade-delete collector queries non-existent tables. + + Calling this in setUp() of every TransactionTestCase prevents both failure + modes. + """ + stale = [ + name + for name, model in list(django_apps.all_models.get(APP_LABEL, {}).items()) + if getattr(model, '_generated_table_model', False) + ] + for name in stale: + django_apps.all_models[APP_LABEL].pop(name, None) + if stale: + django_apps.clear_cache() + + class TransactionCleanupMixin: """Mixin for TransactionTestCase subclasses that create CustomObjectType instances. @@ -14,14 +57,45 @@ class TransactionCleanupMixin: database flush that TransactionTestCase performs between tests. """ + def setUp(self): + # Purge stale in-memory model registrations left by earlier TestCase + # classes whose rolled-back transactions dropped the backing tables. + # Must run before any code that iterates the model registry (e.g. + # netbox-branching's get_tables_to_replicate() during provisioning). + _purge_stale_generated_models() + super().setUp() + def tearDown(self): + from core.models import ObjectChange + from netbox_custom_objects.models import _deferred_co_field_data + # Reset deferred CO field data so it doesn't bleed into the next test. + _deferred_co_field_data.set(None) + # Delete COTs and their backing tables before the DB flush. for cot in CustomObjectType.objects.all(): try: cot.delete() except Exception as exc: print(f"WARNING: tearDown could not delete COT {cot.pk}: {exc}") + # Remove any ObjectChange records created during the test (merge/revert creates + # them in main with the test user's ID). If left in place, the serialized_rollback + # snapshot accumulates them and restoring it after the next flush produces FK + # violations (user referenced by ObjectChange no longer exists). + ObjectChange.objects.all().delete() super().tearDown() + def _fixture_teardown(self): + """Flush tables and restore ContentTypes for the next test class. + + TransactionTestCase._fixture_teardown() TRUNCATEs all tables after each + test. Any TestCase class that follows on the same worker then finds no + ContentTypes and fails trying to look up ObjectType rows. Recreating + them here (idempotently, via get_or_create) avoids that without the + duplicate-key violations that serialized_rollback=True causes when the + parallel runner tries to INSERT rows that already exist. + """ + super()._fixture_teardown() + _recreate_contenttypes() + class CustomObjectsTestCase: """ diff --git a/netbox_custom_objects/tests/test_branching.py b/netbox_custom_objects/tests/test_branching.py new file mode 100644 index 00000000..88309f8c --- /dev/null +++ b/netbox_custom_objects/tests/test_branching.py @@ -0,0 +1,1398 @@ +""" +Branching integration tests for netbox-custom-objects. + +Requires netbox-branching to be installed alongside this plugin. All tests +are skipped when netbox-branching is absent so the suite remains clean in +environments that don't use branching. + +These tests use TransactionTestCase (not TestCase) because branch schemas live +in separate PostgreSQL schemas backed by distinct database connections that +cannot be rolled back inside a single SAVEPOINT-based transaction. +""" +import datetime +import decimal +import time +import unittest +import uuid + +from django.contrib.auth import get_user_model +from django.db import connections +from django.test import RequestFactory, TransactionTestCase +from django.urls import reverse + +try: + from netbox.context_managers import event_tracking + from netbox_branching.choices import BranchStatusChoices + from netbox_branching.models import Branch + from netbox_branching.utilities import activate_branch + HAS_BRANCHING = True +except ImportError: + HAS_BRANCHING = False + +from netbox_custom_objects.models import CustomObjectType, CustomObjectTypeField +from netbox_custom_objects.tests.base import TransactionCleanupMixin, _recreate_contenttypes + +User = get_user_model() + + +def _make_request(user): + """Return a fresh request object suitable for event_tracking.""" + request = RequestFactory().get(reverse('home')) + request.id = uuid.uuid4() + request.user = user + return request + + +def _provision_branch(name, merge_strategy, user): + """Create and wait for a branch to reach READY status (up to 30 s).""" + branch = Branch(name=name, merge_strategy=merge_strategy) + branch.save(provision=False) + branch.provision(user=user) + deadline = time.time() + 30 + while time.time() < deadline: + branch.refresh_from_db() + if branch.status == BranchStatusChoices.READY: + return branch + time.sleep(0.1) + raise TimeoutError( + f'Branch {name!r} did not reach READY within 30 s ' + f'(status={branch.status!r})' + ) + + +def _close_branch_connections(): + """Close any open branch database connections.""" + for branch in Branch.objects.all(): + try: + connections[branch.connection_name].close() + except Exception: + pass + + +# ── Shared merge/revert tests (strategy-agnostic) ──────────────────────────── + +@unittest.skipUnless(HAS_BRANCHING, 'netbox-branching is not installed') +class BaseBranchingTests(TransactionCleanupMixin): + """ + Merge and revert tests that run against every merge strategy. + + Subclasses must: + - set ``MERGE_STRATEGY`` to an iterative or squash strategy string + - also inherit from ``TransactionTestCase`` + + Example:: + + class IterativeBranchingTestCase(BaseBranchingTests, TransactionTestCase): + MERGE_STRATEGY = 'iterative' + """ + + MERGE_STRATEGY = None + + def setUp(self): + super().setUp() # → TransactionCleanupMixin.setUp() → _purge_stale_generated_models() + _recreate_contenttypes() + self.user = User.objects.create_user(username='testuser') + self.request = _make_request(self.user) + + def tearDown(self): + _close_branch_connections() + super().tearDown() # → TransactionCleanupMixin → TransactionTestCase + + # ── simple: one COT, one text field, one CO ─────────────────────────── + + def test_simple_merge_and_revert(self): + """ + Create a COT with a single text field and one custom object instance + inside a branch. Merge to main, then revert. + + Assertions + ---------- + Before merge + - COT is absent from main + - field is absent from main + + After merge + - COT is present in main + - field is present in main + - get() on the CO in main returns the correct field value + + After revert + - COT is absent from main + - field is absent from main + """ + branch = _provision_branch('Simple Branch', self.MERGE_STRATEGY, self.user) + request = _make_request(self.user) + + # ── create inside branch ────────────────────────────────────────── + with activate_branch(branch), event_tracking(request): + cot = CustomObjectType.objects.create(name='simple_cot', slug='simple-cot') + field = CustomObjectTypeField.objects.create( + custom_object_type=cot, + name='notes', + label='Notes', + type='text', + ) + Model = cot.get_model() + co = Model.objects.create(notes='hello from branch') + + cot_pk, field_pk, co_pk = cot.pk, field.pk, co.pk + + # ── before merge: nothing in main ───────────────────────────────── + self.assertFalse( + CustomObjectType.objects.filter(pk=cot_pk).exists(), + 'COT must not be visible in main before merge', + ) + self.assertFalse( + CustomObjectTypeField.objects.filter(pk=field_pk).exists(), + 'Field must not be visible in main before merge', + ) + + # ── merge ───────────────────────────────────────────────────────── + branch.merge(user=self.user, commit=True) + branch.refresh_from_db() + self.assertEqual(branch.status, BranchStatusChoices.MERGED) + + # ── after merge: present in main ────────────────────────────────── + self.assertTrue( + CustomObjectType.objects.filter(pk=cot_pk).exists(), + 'COT must be in main after merge', + ) + self.assertTrue( + CustomObjectTypeField.objects.filter(pk=field_pk).exists(), + 'Field must be in main after merge', + ) + + # get() the CO and verify its value + cot_main = CustomObjectType.objects.get(pk=cot_pk) + co_main = cot_main.get_model().objects.get(pk=co_pk) + self.assertEqual(co_main.notes, 'hello from branch') + + # ── revert ──────────────────────────────────────────────────────── + branch.revert(user=self.user, commit=True) + branch.refresh_from_db() + + # ── after revert: gone from main ────────────────────────────────── + self.assertFalse( + CustomObjectType.objects.filter(pk=cot_pk).exists(), + 'COT must not be in main after revert', + ) + self.assertFalse( + CustomObjectTypeField.objects.filter(pk=field_pk).exists(), + 'Field must not be in main after revert', + ) + + # ── comprehensive: one of every field type ──────────────────────────── + + def test_comprehensive_merge_and_revert(self): + """ + Create a COT with one field of each supported type, plus a CO instance, + inside a branch. Merge to main, then revert. + + Field types + ----------- + text — plain VARCHAR column + integer — INTEGER column + decimal — DECIMAL column (exercises numeric precision handling) + boolean — BOOLEAN column + datetime — TIMESTAMPTZ column (exercises timezone-aware handling) + select — VARCHAR column with a ChoiceSet + object — ForeignKey column (to dcim.Site) + multiobject — M2M through-table (to dcim.Site) + + This exercises every distinct schema-editor operation + (add_field, add_FK, create_through_table) across a merge cycle and + verifies that all field values survive the round-trip. + + Assertions mirror test_simple_merge_and_revert but for every field type. + """ + from core.models import ObjectType + from dcim.models import Site + from extras.models import CustomFieldChoiceSet + + # The Site is created in main before provisioning so it exists in both + # main and the branch schema and is valid as an FK target during merge. + with event_tracking(self.request): + site = Site.objects.create(name='Reference Site', slug='reference-site') + + branch = _provision_branch('Comprehensive Branch', self.MERGE_STRATEGY, self.user) + request = _make_request(self.user) + + site_ot = ObjectType.objects.get(app_label='dcim', model='site') + cot_pk = None + field_pks = {} + co_pk = None + + test_dt = datetime.datetime(2024, 6, 15, 12, 0, 0, tzinfo=datetime.timezone.utc) + test_decimal = decimal.Decimal('3.14') + + # ── create inside branch ────────────────────────────────────────── + with activate_branch(branch), event_tracking(request): + choice_set = CustomFieldChoiceSet.objects.create( + name='Statuses', + extra_choices=[['active', 'Active'], ['inactive', 'Inactive']], + ) + cot = CustomObjectType.objects.create(name='full_cot', slug='full-cot') + cot_pk = cot.pk + + field_specs = [ + ('text_field', {'type': 'text'}), + ('int_field', {'type': 'integer'}), + ('dec_field', {'type': 'decimal'}), + ('bool_field', {'type': 'boolean'}), + ('dt_field', {'type': 'datetime'}), + ('select_field', {'type': 'select', 'choice_set': choice_set}), + ('obj_field', {'type': 'object', 'related_object_type': site_ot}), + ('multi_field', {'type': 'multiobject', 'related_object_type': site_ot}), + ] + for name, kwargs in field_specs: + f = CustomObjectTypeField.objects.create( + custom_object_type=cot, + name=name, + label=name.replace('_', ' ').title(), + **kwargs, + ) + field_pks[name] = f.pk + + Model = cot.get_model() + co = Model.objects.create( + text_field='hello', + int_field=42, + dec_field=test_decimal, + bool_field=True, + dt_field=test_dt, + select_field='active', + obj_field_id=site.pk, + # multi_field left empty — through-table creation is still exercised + ) + co_pk = co.pk + + # ── before merge: nothing in main ───────────────────────────────── + self.assertFalse( + CustomObjectType.objects.filter(pk=cot_pk).exists(), + 'COT must not be in main before merge', + ) + for name, pk in field_pks.items(): + self.assertFalse( + CustomObjectTypeField.objects.filter(pk=pk).exists(), + f'Field {name!r} must not be in main before merge', + ) + + # ── merge ───────────────────────────────────────────────────────── + branch.merge(user=self.user, commit=True) + branch.refresh_from_db() + self.assertEqual(branch.status, BranchStatusChoices.MERGED) + + # ── after merge: present in main, values intact ─────────────────── + self.assertTrue( + CustomObjectType.objects.filter(pk=cot_pk).exists(), + 'COT must be in main after merge', + ) + for name, pk in field_pks.items(): + self.assertTrue( + CustomObjectTypeField.objects.filter(pk=pk).exists(), + f'Field {name!r} must be in main after merge', + ) + + cot_main = CustomObjectType.objects.get(pk=cot_pk) + co_main = cot_main.get_model().objects.get(pk=co_pk) + self.assertEqual(co_main.text_field, 'hello') + self.assertEqual(co_main.int_field, 42) + self.assertEqual(co_main.dec_field, test_decimal) + self.assertTrue(co_main.bool_field) + self.assertEqual(co_main.dt_field, test_dt) + self.assertEqual(co_main.select_field, 'active') + self.assertEqual(co_main.obj_field_id, site.pk) + + # ── revert ──────────────────────────────────────────────────────── + branch.revert(user=self.user, commit=True) + branch.refresh_from_db() + + # ── after revert: gone from main ────────────────────────────────── + self.assertFalse( + CustomObjectType.objects.filter(pk=cot_pk).exists(), + 'COT must not be in main after revert', + ) + for name, pk in field_pks.items(): + self.assertFalse( + CustomObjectTypeField.objects.filter(pk=pk).exists(), + f'Field {name!r} must not be in main after revert', + ) + + # ── object modified inside branch ───────────────────────────────────── + + def test_object_modified_merge_and_revert(self): + """ + CO that exists in main is modified inside a branch. Merge brings the + new value to main; revert restores the original. + + The COT and field are created in main before provisioning so the branch + has a full schema copy. Only the CO data changes inside the branch. + """ + request = _make_request(self.user) + + with event_tracking(request): + cot = CustomObjectType.objects.create(name='modify_cot', slug='modify-cot') + CustomObjectTypeField.objects.create( + custom_object_type=cot, + name='notes', + label='Notes', + type='text', + ) + Model = cot.get_model() + co = Model.objects.create(notes='original value') + + co_pk = co.pk + branch = _provision_branch('Modify Branch', self.MERGE_STRATEGY, self.user) + branch_request = _make_request(self.user) + + # Modify CO inside the branch. + with activate_branch(branch), event_tracking(branch_request): + branch_co = cot.get_model().objects.get(pk=co_pk) + branch_co.snapshot() # captures pre-change state for ObjectChange.diff()['pre'] during revert + branch_co.notes = 'modified in branch' + branch_co.save() + + # Main must not see the modification yet. + co.refresh_from_db() + self.assertEqual(co.notes, 'original value', 'Main must not see branch modification before merge') + + # ── merge ───────────────────────────────────────────────────────── + branch.merge(user=self.user, commit=True) + branch.refresh_from_db() + self.assertEqual(branch.status, BranchStatusChoices.MERGED) + + co.refresh_from_db() + self.assertEqual(co.notes, 'modified in branch', 'Main must see modified value after merge') + + # ── revert ──────────────────────────────────────────────────────── + branch.revert(user=self.user, commit=True) + + co.refresh_from_db() + self.assertEqual(co.notes, 'original value', 'Main must have original value after revert') + + # ── object deleted inside branch ────────────────────────────────────── + + def test_object_deleted_merge_and_revert(self): + """ + CO that exists in main is deleted inside a branch. Merge removes it + from main; revert restores it. + """ + request = _make_request(self.user) + + with event_tracking(request): + cot = CustomObjectType.objects.create(name='delete_cot', slug='delete-cot') + CustomObjectTypeField.objects.create( + custom_object_type=cot, + name='notes', + label='Notes', + type='text', + ) + Model = cot.get_model() + co = Model.objects.create(notes='will be deleted') + + co_pk = co.pk + branch = _provision_branch('Delete Branch', self.MERGE_STRATEGY, self.user) + branch_request = _make_request(self.user) + + with activate_branch(branch), event_tracking(branch_request): + cot.get_model().objects.get(pk=co_pk).delete() + + # CO must still exist in main before merge. + self.assertTrue( + cot.get_model().objects.filter(pk=co_pk).exists(), + 'CO must still exist in main before merge', + ) + + # ── merge ───────────────────────────────────────────────────────── + branch.merge(user=self.user, commit=True) + branch.refresh_from_db() + self.assertEqual(branch.status, BranchStatusChoices.MERGED) + + self.assertFalse( + cot.get_model().objects.filter(pk=co_pk).exists(), + 'CO must be deleted from main after merge', + ) + + # ── revert ──────────────────────────────────────────────────────── + branch.revert(user=self.user, commit=True) + + self.assertTrue( + cot.get_model().objects.filter(pk=co_pk).exists(), + 'CO must be restored in main after revert', + ) + + # ── field renamed inside branch → merge ─────────────────────────────── + + def test_field_rename_merge_and_revert(self): + """ + Field created and then renamed inside a branch. Merge brings the COT + with the renamed column to main; revert removes it. + + Exercises _schema_alter_field via the merge deserialization path using + PK-based rename detection (same PK, different name values). + """ + branch = _provision_branch('Rename Branch', self.MERGE_STRATEGY, self.user) + request = _make_request(self.user) + + with activate_branch(branch), event_tracking(request): + cot = CustomObjectType.objects.create(name='rename_cot', slug='rename-cot') + field = CustomObjectTypeField.objects.create( + custom_object_type=cot, + name='old_name', + label='Old Name', + type='text', + ) + # Load from DB so _original is set, then rename. + field = CustomObjectTypeField.objects.get(pk=field.pk) + field.snapshot() # captures pre-change state for ObjectChange.diff()['pre'] during revert + field.name = 'new_name' + field.label = 'New Name' + field.save() + Model = cot.get_model() + co = Model.objects.create(new_name='value after rename') + + field_pk, cot_pk, co_pk = field.pk, cot.pk, co.pk + + self.assertFalse( + CustomObjectType.objects.filter(pk=cot_pk).exists(), + 'COT must not exist in main before merge', + ) + + # ── merge ───────────────────────────────────────────────────────── + branch.merge(user=self.user, commit=True) + branch.refresh_from_db() + self.assertEqual(branch.status, BranchStatusChoices.MERGED) + + self.assertTrue(CustomObjectTypeField.objects.filter(pk=field_pk).exists()) + field_main = CustomObjectTypeField.objects.get(pk=field_pk) + self.assertEqual(field_main.name, 'new_name', 'Field must have new name in main after merge') + + cot_main = CustomObjectType.objects.get(pk=cot_pk) + co_main = cot_main.get_model().objects.get(pk=co_pk) + self.assertEqual(co_main.new_name, 'value after rename') + + # ── revert ──────────────────────────────────────────────────────── + branch.revert(user=self.user, commit=True) + + self.assertFalse(CustomObjectType.objects.filter(pk=cot_pk).exists()) + self.assertFalse(CustomObjectTypeField.objects.filter(pk=field_pk).exists()) + + # ── unique constraint toggled inside branch → merge ─────────────────── + + def test_field_unique_toggle_merge_and_revert(self): + """ + Field created without a unique constraint inside a branch, then + toggled to unique=True. Merge brings the COT with the UNIQUE + constraint to main; revert removes it. + + Exercises alter_field for constraint-only changes via the merge path. + """ + from django.db import connection as main_conn + + branch = _provision_branch('Unique Branch', self.MERGE_STRATEGY, self.user) + request = _make_request(self.user) + + with activate_branch(branch), event_tracking(request): + cot = CustomObjectType.objects.create(name='unique_cot', slug='unique-cot') + field = CustomObjectTypeField.objects.create( + custom_object_type=cot, + name='code', + label='Code', + type='text', + unique=False, + ) + # Load from DB so _original is set, then enable unique. + field = CustomObjectTypeField.objects.get(pk=field.pk) + field.snapshot() # captures pre-change state for ObjectChange.diff()['pre'] during revert + field.unique = True + field.save() + Model = cot.get_model() + co_pk = Model.objects.create(code='ABC').pk + + field_pk, cot_pk = field.pk, cot.pk + + # ── merge ───────────────────────────────────────────────────────── + branch.merge(user=self.user, commit=True) + branch.refresh_from_db() + self.assertEqual(branch.status, BranchStatusChoices.MERGED) + + field_main = CustomObjectTypeField.objects.get(pk=field_pk) + self.assertTrue(field_main.unique, 'Field must have unique=True in main after merge') + + # Verify the UNIQUE constraint exists in main's physical schema. + cot_main = CustomObjectType.objects.get(pk=cot_pk) + table_name = cot_main.get_database_table_name() + with main_conn.cursor() as cursor: + constraints = main_conn.introspection.get_constraints(cursor, table_name) + self.assertTrue( + any(c['unique'] and c.get('columns') == ['code'] for c in constraints.values()), + 'UNIQUE constraint on "code" must exist in main schema after merge', + ) + + # CO with code='ABC' must have survived the merge. + cot_main2 = CustomObjectType.objects.get(pk=cot_pk) + self.assertTrue(cot_main2.get_model().objects.filter(pk=co_pk).exists()) + + # ── revert ──────────────────────────────────────────────────────── + branch.revert(user=self.user, commit=True) + + self.assertFalse(CustomObjectType.objects.filter(pk=cot_pk).exists()) + + # ── non-schema field attributes inside branch → merge ───────────────── + + def test_field_non_schema_attrs_merge_and_revert(self): + """ + Field attributes that do not affect the physical schema (label, + primary, required, description) survive a branch merge and revert + without causing schema errors or spurious ALTER TABLE calls. + + These attributes are excluded from _field_schema_key and exist only + at the application layer. This test confirms that altering them in + a branch and merging correctly updates the ORM-level field record in + main without touching the DB column definition. + """ + branch = _provision_branch('Attrs Branch', self.MERGE_STRATEGY, self.user) + request = _make_request(self.user) + + with activate_branch(branch), event_tracking(request): + cot = CustomObjectType.objects.create(name='attrs_cot', slug='attrs-cot') + field = CustomObjectTypeField.objects.create( + custom_object_type=cot, + name='title', + label='Title', + type='text', + primary=False, + required=False, + description='original description', + ) + # Load from DB so _original is set, then mutate non-schema attrs. + field = CustomObjectTypeField.objects.get(pk=field.pk) + field.snapshot() # captures pre-change state for ObjectChange.diff()['pre'] during revert + field.label = 'Updated Title' + field.primary = True + field.required = True + field.description = 'updated description' + field.save() + + field_pk, cot_pk = field.pk, cot.pk + + # ── merge ───────────────────────────────────────────────────────── + branch.merge(user=self.user, commit=True) + branch.refresh_from_db() + self.assertEqual(branch.status, BranchStatusChoices.MERGED) + + field_main = CustomObjectTypeField.objects.get(pk=field_pk) + self.assertEqual(field_main.label, 'Updated Title') + self.assertTrue(field_main.primary) + self.assertTrue(field_main.required) + self.assertEqual(field_main.description, 'updated description') + + # ── revert ──────────────────────────────────────────────────────── + branch.revert(user=self.user, commit=True) + + self.assertFalse(CustomObjectType.objects.filter(pk=cot_pk).exists()) + self.assertFalse(CustomObjectTypeField.objects.filter(pk=field_pk).exists()) + + +# ── Concrete test classes (one per merge strategy) ──────────────────────────── + +@unittest.skipUnless(HAS_BRANCHING, 'netbox-branching is not installed') +class IterativeBranchingTestCase(BaseBranchingTests, TransactionTestCase): + """Run BaseBranchingTests with the iterative merge strategy.""" + MERGE_STRATEGY = 'iterative' + + +@unittest.skipUnless(HAS_BRANCHING, 'netbox-branching is not installed') +class SquashBranchingTestCase(BaseBranchingTests, TransactionTestCase): + """Run BaseBranchingTests with the squash merge strategy.""" + MERGE_STRATEGY = 'squash' + + +# ── Sync test ───────────────────────────────────────────────────────────────── + +@unittest.skipUnless(HAS_BRANCHING, 'netbox-branching is not installed') +class BranchSyncTestCase(TransactionCleanupMixin, TransactionTestCase): + """ + Test that objects created in main after a branch is provisioned are not + visible in the branch until the branch is synced, and are correctly + available in the branch after sync. + """ + + def setUp(self): + super().setUp() # → TransactionCleanupMixin.setUp() → _purge_stale_generated_models() + _recreate_contenttypes() + self.user = User.objects.create_user(username='testuser') + self.request = _make_request(self.user) + + def tearDown(self): + _close_branch_connections() + super().tearDown() # → TransactionCleanupMixin → TransactionTestCase + + def test_main_changes_synced_to_branch(self): + """ + A COT, field, and CO created in main *after* a branch is provisioned + must not appear in the branch before sync. After branch.sync() they + must be present and the CO must be retrievable. + + Scenario + -------- + 1. Provision branch (no COT exists yet). + 2. Create COT, field, and CO in main. + 3. Assert they are absent from the branch. + 4. sync() the branch. + 5. Assert they are present in the branch and the CO value is correct. + """ + branch = _provision_branch('Sync Branch', 'iterative', self.user) + request = _make_request(self.user) + + # ── create COT, field, CO in main ───────────────────────────────── + with event_tracking(request): + cot = CustomObjectType.objects.create(name='main_cot', slug='main-cot') + field = CustomObjectTypeField.objects.create( + custom_object_type=cot, + name='title', + label='Title', + type='text', + ) + Model = cot.get_model() + co = Model.objects.create(title='main object') + + cot_pk, field_pk, co_pk = cot.pk, field.pk, co.pk + + # ── before sync: absent from branch ─────────────────────────────── + with activate_branch(branch): + self.assertFalse( + CustomObjectType.objects.filter(pk=cot_pk).exists(), + 'COT must not be in branch before sync', + ) + self.assertFalse( + CustomObjectTypeField.objects.filter(pk=field_pk).exists(), + 'Field must not be in branch before sync', + ) + + # ── sync ────────────────────────────────────────────────────────── + branch.sync(user=self.user, commit=True) + branch.refresh_from_db() + + # ── after sync: present in branch ───────────────────────────────── + with activate_branch(branch): + self.assertTrue( + CustomObjectType.objects.filter(pk=cot_pk).exists(), + 'COT must be in branch after sync', + ) + self.assertTrue( + CustomObjectTypeField.objects.filter(pk=field_pk).exists(), + 'Field must be in branch after sync', + ) + cot_branch = CustomObjectType.objects.get(pk=cot_pk) + BranchModel = cot_branch.get_model() + co_branch = BranchModel.objects.get(pk=co_pk) + self.assertEqual(co_branch.title, 'main object') + + +# ── Concurrent-edit tests (both main and branch modified before sync/merge) ─── + +@unittest.skipUnless(HAS_BRANCHING, 'netbox-branching is not installed') +class ConcurrentEditSyncTestCase(TransactionCleanupMixin, TransactionTestCase): + """ + Sync scenarios where both main and branch accumulate changes before sync(). + + Mirrors netbox-branching's test_sync_m2m_tags_concurrent_changes pattern: + after sync(), main's ObjectChanges are applied on top of whatever the branch + did, so main's post-change state takes precedence for any conflicting record. + """ + + def setUp(self): + super().setUp() + _recreate_contenttypes() + self.user = User.objects.create_user(username='testuser') + self.request = _make_request(self.user) + + def tearDown(self): + _close_branch_connections() + super().tearDown() + + def test_co_values_modified_in_both_sync(self): + """ + CO field values modified in both main and branch before sync. + + Scenario + -------- + 1. Create COT + field 'notes' + two COs in main. + 2. Provision branch (branch sees same COs). + 3. In branch: update shared CO to 'modified in branch'. + 4. In main: update a different CO; create a brand-new CO. + 5. sync() applies main's ObjectChanges to the branch. + + Expected after sync + ------------------- + - Main's new CO is visible in the branch. + - CO modified in main has main's value in the branch. + - Branch's own CO modification is overwritten by main's replay for + the same PK (main wins on sync, same as tag-conflict behaviour). + """ + request = _make_request(self.user) + + with event_tracking(request): + cot = CustomObjectType.objects.create(name='sync_co_cot', slug='sync-co-cot') + CustomObjectTypeField.objects.create( + custom_object_type=cot, name='notes', label='Notes', type='text', + ) + Model = cot.get_model() + co_shared = Model.objects.create(notes='original shared') + co_main_only = Model.objects.create(notes='main only original') + + co_shared_pk = co_shared.pk + co_main_only_pk = co_main_only.pk + branch = _provision_branch('Sync CO Both', 'iterative', self.user) + branch_request = _make_request(self.user) + + # ── branch: update the shared CO ───────────────────────────────── + with activate_branch(branch), event_tracking(branch_request): + BM = cot.get_model() + co = BM.objects.get(pk=co_shared_pk) + co.snapshot() + co.notes = 'modified in branch' + co.save() + branch_new = BM.objects.create(notes='new in branch') + branch_new_pk = branch_new.pk + + # ── main: update the other CO; add a new CO ─────────────────────── + with event_tracking(request): + MM = cot.get_model() + co = MM.objects.get(pk=co_main_only_pk) + co.snapshot() + co.notes = 'modified in main' + co.save() + main_new = MM.objects.create(notes='new in main') + main_new_pk = main_new.pk + + # ── sync ────────────────────────────────────────────────────────── + branch.sync(user=self.user, commit=True) + + with activate_branch(branch): + SyncedCOT = CustomObjectType.objects.get(pk=cot.pk) + SM = SyncedCOT.get_model() + + # CO modified in main must reflect main's value. + self.assertEqual(SM.objects.get(pk=co_main_only_pk).notes, 'modified in main') + # CO created in main must be visible in branch after sync. + self.assertTrue(SM.objects.filter(pk=main_new_pk).exists(), + 'CO created in main must appear in branch after sync') + # CO created in branch was not deleted by sync. + self.assertTrue(SM.objects.filter(pk=branch_new_pk).exists(), + 'CO created in branch must still exist after sync') + + def test_field_rename_in_branch_co_add_in_main_sync(self): + """ + Field renamed inside a branch; new CO added in main (no rename in main). + + Scenario + -------- + 1. Create COT + field 'alpha' + CO in main. + 2. Provision branch. + 3. In branch: rename 'alpha' → 'branch_alpha'; create a CO. + 4. In main: create a new CO using the original field name 'alpha'. + 5. sync() replays main's CO-create on top of the branch. + + Expected after sync + ------------------- + - Branch retains the renamed field ('branch_alpha'). + - CO created in main is present in branch. + - CO created in branch still exists. + + This exercises the schema-mismatch path: main's CO ObjectChange carries + the original field key 'alpha' while the branch schema uses 'branch_alpha'. + The CO data from main is applied by the branching engine regardless of the + column name divergence (branching replays at the data layer). + """ + request = _make_request(self.user) + + with event_tracking(request): + cot = CustomObjectType.objects.create(name='sync_rename_cot', slug='sync-rename-cot') + CustomObjectTypeField.objects.create( + custom_object_type=cot, name='alpha', label='Alpha', type='text', + ) + cot.get_model().objects.create(alpha='original') + + branch = _provision_branch('Sync Rename Branch', 'iterative', self.user) + branch_request = _make_request(self.user) + + # ── branch: rename alpha → branch_alpha; create CO ──────────────── + with activate_branch(branch), event_tracking(branch_request): + field = CustomObjectTypeField.objects.get(custom_object_type=cot, name='alpha') + field.snapshot() + field.name = 'branch_alpha' + field.label = 'Branch Alpha' + field.save() + BM = cot.get_model() + branch_new = BM.objects.create(branch_alpha='new in branch') + branch_new_pk = branch_new.pk + + # ── main: add a CO (no field rename) ────────────────────────────── + with event_tracking(request): + cot.get_model().objects.create(alpha='new in main') + + # ── sync ────────────────────────────────────────────────────────── + branch.sync(user=self.user, commit=True) + + with activate_branch(branch): + cot_b = CustomObjectType.objects.get(pk=cot.pk) + field_b = CustomObjectTypeField.objects.get(custom_object_type=cot_b) + # Branch field rename must be preserved (main did not rename). + self.assertEqual(field_b.name, 'branch_alpha', + 'Branch field rename must be preserved after sync') + BM = cot_b.get_model() + self.assertTrue(BM.objects.filter(pk=branch_new_pk).exists(), + 'CO created in branch must survive sync') + + def test_concurrent_field_rename_sync_no_crash(self): + """ + Field renamed to different names in both main and branch before sync. + + The same CustomObjectTypeField PK was modified in both schemas. + _schema_alter_field detects the conflict (neither the original 'alpha' + column nor the target 'main_alpha' column exists in the branch), looks up + the live column name in the branch ('branch_alpha'), and renames it to + 'main_alpha' to converge the branch schema on main's post-sync state. + + Scenario + -------- + 1. Create COT + field 'alpha' in main. + 2. Provision branch. + 3. Branch renames 'alpha' → 'branch_alpha'. + 4. Main renames 'alpha' → 'main_alpha'. + 5. sync() applies main's rename ObjectChange to the branch. + _schema_alter_field resolves the conflict: 'branch_alpha' → 'main_alpha'. + + Expected + -------- + - sync() completes without raising a DB error. + - Branch physical column is 'main_alpha' (converged to main's rename). + - 'branch_alpha' and 'alpha' columns are absent from the branch table. + """ + request = _make_request(self.user) + + with event_tracking(request): + cot = CustomObjectType.objects.create(name='confl_sync_cot', slug='confl-sync-cot') + CustomObjectTypeField.objects.create( + custom_object_type=cot, name='alpha', label='Alpha', type='text', + ) + + branch = _provision_branch('Conflict Sync Branch', 'iterative', self.user) + branch_request = _make_request(self.user) + + # ── branch: rename alpha → branch_alpha ─────────────────────────── + with activate_branch(branch), event_tracking(branch_request): + f = CustomObjectTypeField.objects.get(custom_object_type=cot, name='alpha') + f.snapshot() + f.name = 'branch_alpha' + f.label = 'Branch Alpha' + f.save() + + # ── main: rename alpha → main_alpha ─────────────────────────────── + with event_tracking(request): + f = CustomObjectTypeField.objects.get(custom_object_type=cot, name='alpha') + f.name = 'main_alpha' + f.label = 'Main Alpha' + f.save() + + # ── sync — must not raise ────────────────────────────────────────── + try: + branch.sync(user=self.user, commit=True) + except Exception as exc: + self.fail(f'sync() raised an unexpected exception: {exc!r}') + + branch.refresh_from_db() + + # Branch column should now be 'main_alpha' — the conflict was resolved by + # renaming the branch's live 'branch_alpha' column to main's target name. + branch_conn = connections[branch.connection_name] + with branch_conn.cursor() as cursor: + branch_cols = { + col.name + for col in branch_conn.introspection.get_table_description( + cursor, cot.get_database_table_name(), + ) + } + self.assertIn('main_alpha', branch_cols, 'Branch column must converge to main_alpha after sync') + self.assertNotIn('branch_alpha', branch_cols, 'branch_alpha column must be gone after sync') + self.assertNotIn('alpha', branch_cols, 'alpha column must be gone after sync') + + +# ── Concurrent-edit merge tests ─────────────────────────────────────────────── + +@unittest.skipUnless(HAS_BRANCHING, 'netbox-branching is not installed') +class ConcurrentEditMergeTestCase(TransactionCleanupMixin, TransactionTestCase): + """ + Merge scenarios where both main and branch accumulate changes before merge(). + """ + + def setUp(self): + super().setUp() + _recreate_contenttypes() + self.user = User.objects.create_user(username='testuser') + self.request = _make_request(self.user) + + def tearDown(self): + _close_branch_connections() + super().tearDown() + + def test_field_rename_in_branch_co_changes_merge_iterative(self): + """ + Field renamed inside a branch; COs added/updated in branch; main adds a CO. + Iterative merge brings the branch rename and CO changes into main. + + Scenario + -------- + 1. Create COT + field 'alpha' + CO in main. + 2. Provision branch. + 3. In branch: rename 'alpha' → 'beta'; update the existing CO; create a CO. + 4. In main: add a CO (field name 'alpha', no rename). + 5. merge() → revert(). + + Expected after merge + -------------------- + - Field name in main is 'beta'. + - Existing CO has the branch-updated value. + - CO created in branch is present in main. + + Expected after revert + --------------------- + - Field name is 'alpha' again. + - CO created in branch is gone. + - Existing CO has its original value. + """ + request = _make_request(self.user) + + with event_tracking(request): + cot = CustomObjectType.objects.create(name='merge_rename_cot', slug='merge-rename-cot') + CustomObjectTypeField.objects.create( + custom_object_type=cot, name='alpha', label='Alpha', type='text', + ) + Model = cot.get_model() + co_existing = Model.objects.create(alpha='original') + + co_existing_pk = co_existing.pk + branch = _provision_branch('Merge Rename Branch', 'iterative', self.user) + branch_request = _make_request(self.user) + + # ── branch: rename; update CO; create CO ────────────────────────── + with activate_branch(branch), event_tracking(branch_request): + field = CustomObjectTypeField.objects.get(custom_object_type=cot, name='alpha') + field.snapshot() + field.name = 'beta' + field.label = 'Beta' + field.save() + BM = cot.get_model() + co = BM.objects.get(pk=co_existing_pk) + co.snapshot() + co.beta = 'updated in branch' + co.save() + branch_new = BM.objects.create(beta='new in branch') + branch_new_pk = branch_new.pk + + # ── main: add a CO (no schema change) ───────────────────────────── + with event_tracking(request): + main_new = cot.get_model().objects.create(alpha='new in main') + main_new_pk = main_new.pk + + # ── merge ───────────────────────────────────────────────────────── + branch.merge(user=self.user, commit=True) + branch.refresh_from_db() + self.assertEqual(branch.status, BranchStatusChoices.MERGED) + + MergedModel = cot.get_model() + field_main = CustomObjectTypeField.objects.get(custom_object_type=cot) + self.assertEqual(field_main.name, 'beta', 'Field must be "beta" in main after merge') + self.assertEqual(MergedModel.objects.get(pk=co_existing_pk).beta, 'updated in branch') + self.assertTrue(MergedModel.objects.filter(pk=branch_new_pk).exists(), + 'CO created in branch must be in main after merge') + self.assertTrue(MergedModel.objects.filter(pk=main_new_pk).exists(), + 'CO added in main must still be present after merge') + + # ── revert ──────────────────────────────────────────────────────── + branch.revert(user=self.user, commit=True) + + field_reverted = CustomObjectTypeField.objects.get(custom_object_type=cot) + self.assertEqual(field_reverted.name, 'alpha', 'Field must be "alpha" after revert') + RevertedModel = cot.get_model() + self.assertEqual(RevertedModel.objects.get(pk=co_existing_pk).alpha, 'original') + self.assertFalse(RevertedModel.objects.filter(pk=branch_new_pk).exists(), + 'CO created in branch must be gone after revert') + + def test_co_values_modified_in_both_merge_iterative(self): + """ + CO values modified in both main and branch before merge. + Branch changes win because merge applies branch ObjectChanges to main. + + Scenario + -------- + 1. Create COT + field 'notes' + shared CO in main. + 2. Provision branch. + 3. Branch updates shared CO to 'modified in branch'; creates a CO. + 4. Main creates a separate CO. + 5. merge() → revert(). + + Expected after merge: branch CO changes are in main, main CO preserved. + Expected after revert: shared CO back to original, branch CO gone. + """ + request = _make_request(self.user) + + with event_tracking(request): + cot = CustomObjectType.objects.create(name='merge_co_cot', slug='merge-co-cot') + CustomObjectTypeField.objects.create( + custom_object_type=cot, name='notes', label='Notes', type='text', + ) + Model = cot.get_model() + co_shared = Model.objects.create(notes='original') + + co_shared_pk = co_shared.pk + branch = _provision_branch('Merge CO Both', 'iterative', self.user) + branch_request = _make_request(self.user) + + # ── branch ──────────────────────────────────────────────────────── + with activate_branch(branch), event_tracking(branch_request): + BM = cot.get_model() + co = BM.objects.get(pk=co_shared_pk) + co.snapshot() + co.notes = 'modified in branch' + co.save() + branch_new = BM.objects.create(notes='new in branch') + branch_new_pk = branch_new.pk + + # ── main ────────────────────────────────────────────────────────── + with event_tracking(request): + main_new = cot.get_model().objects.create(notes='new in main') + main_new_pk = main_new.pk + + # ── merge ───────────────────────────────────────────────────────── + branch.merge(user=self.user, commit=True) + branch.refresh_from_db() + self.assertEqual(branch.status, BranchStatusChoices.MERGED) + + MM = cot.get_model() + self.assertEqual(MM.objects.get(pk=co_shared_pk).notes, 'modified in branch') + self.assertTrue(MM.objects.filter(pk=branch_new_pk).exists()) + self.assertTrue(MM.objects.filter(pk=main_new_pk).exists(), + 'CO added to main must survive the merge') + + # ── revert ──────────────────────────────────────────────────────── + branch.revert(user=self.user, commit=True) + + RM = cot.get_model() + self.assertEqual(RM.objects.get(pk=co_shared_pk).notes, 'original') + self.assertFalse(RM.objects.filter(pk=branch_new_pk).exists()) + + +# ── Sequential multi-rename tests ───────────────────────────────────────────── + +@unittest.skipUnless(HAS_BRANCHING, 'netbox-branching is not installed') +class SequentialRenameTestCase(TransactionCleanupMixin, TransactionTestCase): + """ + Tests for sequential field renames (A→B→C) in a branch with CO changes at + each step, plus independent changes in main. + + Exercises the iterative ObjectChange replay order: the rename chain must be + applied in the right sequence so that each CO update sees the correct column + name at merge time. + + Run with both iterative and squash strategies to verify that squash correctly + collapses the A→B→C chain to a single A→C alter. + """ + + MERGE_STRATEGY = 'iterative' + + def setUp(self): + super().setUp() + _recreate_contenttypes() + self.user = User.objects.create_user(username='testuser') + self.request = _make_request(self.user) + + def tearDown(self): + _close_branch_connections() + super().tearDown() + + def _run_sequential_rename_merge(self, cot_name, cot_slug): + """ + Shared implementation for the sequential rename merge test. + + Branch: rename alpha→beta (update+create CO), rename beta→gamma (update+create CO). + Main: add a new independent field + CO (no rename of alpha). + merge() then revert(). + """ + request = _make_request(self.user) + + with event_tracking(request): + cot = CustomObjectType.objects.create(name=cot_name, slug=cot_slug) + CustomObjectTypeField.objects.create( + custom_object_type=cot, name='alpha', label='Alpha', type='text', + ) + Model = cot.get_model() + co_original = Model.objects.create(alpha='original value') + + co_original_pk = co_original.pk + branch = _provision_branch(f'{cot_name} branch', self.MERGE_STRATEGY, self.user) + branch_request = _make_request(self.user) + + # ── branch: alpha → beta; update CO; create CO ──────────────────── + with activate_branch(branch), event_tracking(branch_request): + field = CustomObjectTypeField.objects.get(custom_object_type=cot, name='alpha') + field.snapshot() + field.name = 'beta' + field.label = 'Beta' + field.save() + BM = cot.get_model() + co = BM.objects.get(pk=co_original_pk) + co.snapshot() + co.beta = 'after rename to beta' + co.save() + co_at_beta = BM.objects.create(beta='created at beta') + co_at_beta_pk = co_at_beta.pk + + # ── branch: beta → gamma; update CO; create CO ──────────────────── + with activate_branch(branch), event_tracking(branch_request): + field = CustomObjectTypeField.objects.get(custom_object_type=cot, name='beta') + field.snapshot() + field.name = 'gamma' + field.label = 'Gamma' + field.save() + BM = cot.get_model() + co = BM.objects.get(pk=co_original_pk) + co.snapshot() + co.gamma = 'after rename to gamma' + co.save() + co_at_gamma = BM.objects.create(gamma='created at gamma') + co_at_gamma_pk = co_at_gamma.pk + + # ── main: add a new independent field + CO ──────────────────────── + with event_tracking(request): + CustomObjectTypeField.objects.create( + custom_object_type=cot, name='extra', label='Extra', type='text', + ) + co_main = cot.get_model().objects.create(alpha='main added', extra='extra val') + co_main_pk = co_main.pk + + # ── merge ───────────────────────────────────────────────────────── + branch.merge(user=self.user, commit=True) + branch.refresh_from_db() + self.assertEqual(branch.status, BranchStatusChoices.MERGED) + + field_names = {f.name for f in CustomObjectTypeField.objects.filter(custom_object_type=cot)} + self.assertIn('gamma', field_names, 'Final field name must be "gamma" after merge') + self.assertNotIn('alpha', field_names, '"alpha" must be absent after merge') + self.assertNotIn('beta', field_names, '"beta" must be absent after merge') + self.assertIn('extra', field_names, '"extra" field from main must be present after merge') + + MergedModel = cot.get_model() + self.assertEqual(MergedModel.objects.get(pk=co_original_pk).gamma, 'after rename to gamma') + self.assertTrue(MergedModel.objects.filter(pk=co_at_beta_pk).exists(), + 'CO created at beta step must survive merge') + self.assertTrue(MergedModel.objects.filter(pk=co_at_gamma_pk).exists(), + 'CO created at gamma step must survive merge') + self.assertTrue(MergedModel.objects.filter(pk=co_main_pk).exists(), + 'CO added in main must survive merge') + + # ── revert ──────────────────────────────────────────────────────── + branch.revert(user=self.user, commit=True) + + field_names_r = {f.name for f in CustomObjectTypeField.objects.filter(custom_object_type=cot)} + self.assertIn('alpha', field_names_r, '"alpha" must be restored after revert') + self.assertNotIn('gamma', field_names_r, '"gamma" must be gone after revert') + + RevertedModel = cot.get_model() + self.assertEqual(RevertedModel.objects.get(pk=co_original_pk).alpha, 'original value', + 'Original CO value must be restored after revert') + self.assertFalse(RevertedModel.objects.filter(pk=co_at_beta_pk).exists()) + self.assertFalse(RevertedModel.objects.filter(pk=co_at_gamma_pk).exists()) + + def test_sequential_renames_alpha_beta_gamma_merge(self): + """Field renamed A→B→C in branch with CO changes at each step; merge + revert.""" + self._run_sequential_rename_merge('seq_iter_cot', 'seq-iter-cot') + + def test_sequential_renames_both_sides_sync(self): + """ + Branch renames A→B→C while main renames A→D. + Both schemas independently rename the same field to different names. + + After sync(), main's rename (A→D) is applied on top of the branch's + state. Because 'alpha' no longer exists in the branch (it was renamed + to 'gamma' via beta), _schema_alter_field detects the conflict, looks up + the live column name in the branch ('gamma'), and renames it to 'delta' + to converge the branch schema on main's post-sync state. + + Scenario + -------- + 1. Create COT + field 'alpha' + CO in main. + 2. Provision branch. + 3. Branch: alpha→beta (update CO), beta→gamma (update CO, add CO). + 4. Main: alpha→delta (update CO, add CO). + 5. sync(): apply main's changes to branch — column converges to 'delta'. + """ + request = _make_request(self.user) + + with event_tracking(request): + cot = CustomObjectType.objects.create(name='seq_sync_cot', slug='seq-sync-cot') + CustomObjectTypeField.objects.create( + custom_object_type=cot, name='alpha', label='Alpha', type='text', + ) + co = cot.get_model().objects.create(alpha='original') + + co_pk = co.pk + branch = _provision_branch('Seq Sync Branch', 'iterative', self.user) + branch_request = _make_request(self.user) + + # ── branch: alpha → beta → gamma ────────────────────────────────── + with activate_branch(branch), event_tracking(branch_request): + f = CustomObjectTypeField.objects.get(custom_object_type=cot, name='alpha') + f.snapshot() + f.name = 'beta' + f.label = 'Beta' + f.save() + BM = cot.get_model() + co_b = BM.objects.get(pk=co_pk) + co_b.snapshot() + co_b.beta = 'at beta' + co_b.save() + + with activate_branch(branch), event_tracking(branch_request): + f = CustomObjectTypeField.objects.get(custom_object_type=cot, name='beta') + f.snapshot() + f.name = 'gamma' + f.label = 'Gamma' + f.save() + BM = cot.get_model() + co_g = BM.objects.get(pk=co_pk) + co_g.snapshot() + co_g.gamma = 'at gamma' + co_g.save() + BM.objects.create(gamma='new in branch') + + # ── main: alpha → delta ─────────────────────────────────────────── + with event_tracking(request): + f = CustomObjectTypeField.objects.get(custom_object_type=cot, name='alpha') + f.name = 'delta' + f.label = 'Delta' + f.save() + MM = cot.get_model() + co_m = MM.objects.get(pk=co_pk) + co_m.snapshot() + co_m.delta = 'updated in main' + co_m.save() + MM.objects.create(delta='main new') + + # ── sync ────────────────────────────────────────────────────────── + try: + branch.sync(user=self.user, commit=True) + except Exception as exc: + self.fail(f'sync() must not raise when schemas have conflicting renames: {exc!r}') + + branch.refresh_from_db() + + # _schema_alter_field resolved the conflict by looking up the live column + # ('gamma') in the branch and renaming it to main's target name ('delta'). + branch_conn = connections[branch.connection_name] + with branch_conn.cursor() as cursor: + branch_cols = { + col.name + for col in branch_conn.introspection.get_table_description( + cursor, cot.get_database_table_name(), + ) + } + self.assertIn('delta', branch_cols, 'Branch column must converge to delta after sync') + self.assertNotIn('gamma', branch_cols, 'gamma column must be gone after sync') + self.assertNotIn('beta', branch_cols, 'beta column must be gone after sync') + self.assertNotIn('alpha', branch_cols, 'alpha column must be gone after sync') + + def test_sequential_renames_both_sides_merge(self): + """ + Branch renames A→B→C; main renames A→D independently. + merge() applies branch's rename chain to main. + + When merging the first branch rename (alpha→beta) into main, 'alpha' no + longer exists in main (it was renamed to 'delta') and 'beta' doesn't exist + either. _schema_alter_field detects the conflict, looks up the live column + in main ('delta'), and renames it to 'beta'. The second branch rename + (beta→gamma) then finds 'beta' in main and renames it normally to 'gamma'. + + Expected after merge: main's physical column is 'gamma'. + """ + request = _make_request(self.user) + + with event_tracking(request): + cot = CustomObjectType.objects.create(name='seq_merge_cot', slug='seq-merge-cot') + CustomObjectTypeField.objects.create( + custom_object_type=cot, name='alpha', label='Alpha', type='text', + ) + co = cot.get_model().objects.create(alpha='original') + + co_pk = co.pk + branch = _provision_branch('Seq Merge Conflict Branch', 'iterative', self.user) + branch_request = _make_request(self.user) + + # ── branch: alpha → beta → gamma ────────────────────────────────── + with activate_branch(branch), event_tracking(branch_request): + f = CustomObjectTypeField.objects.get(custom_object_type=cot, name='alpha') + f.snapshot() + f.name = 'beta' + f.label = 'Beta' + f.save() + BM = cot.get_model() + co_b = BM.objects.get(pk=co_pk) + co_b.snapshot() + co_b.beta = 'at beta' + co_b.save() + + with activate_branch(branch), event_tracking(branch_request): + f = CustomObjectTypeField.objects.get(custom_object_type=cot, name='beta') + f.snapshot() + f.name = 'gamma' + f.label = 'Gamma' + f.save() + BM = cot.get_model() + co_g = BM.objects.get(pk=co_pk) + co_g.snapshot() + co_g.gamma = 'at gamma' + co_g.save() + BM.objects.create(gamma='new in branch') + + # ── main: alpha → delta ─────────────────────────────────────────── + with event_tracking(request): + f = CustomObjectTypeField.objects.get(custom_object_type=cot, name='alpha') + f.name = 'delta' + f.label = 'Delta' + f.save() + + # ── merge ───────────────────────────────────────────────────────── + try: + branch.merge(user=self.user, commit=True) + except Exception as exc: + self.fail(f'merge() must not raise when schemas have conflicting renames: {exc!r}') + + branch.refresh_from_db() + + # _schema_alter_field resolved the conflict for the first branch rename + # (alpha→beta): it found 'delta' (main's live column) and renamed it to + # 'beta'. The second rename (beta→gamma) then proceeded normally. + # Main's final physical column should be 'gamma'. + from django.db import connection as main_conn + with main_conn.cursor() as cursor: + main_cols = { + col.name + for col in main_conn.introspection.get_table_description( + cursor, cot.get_database_table_name(), + ) + } + self.assertIn('gamma', main_cols, 'Main column must be gamma after merge') + self.assertNotIn('delta', main_cols, 'delta column must be gone after merge') + self.assertNotIn('beta', main_cols, 'beta column must be gone after merge') + self.assertNotIn('alpha', main_cols, 'alpha column must be gone after merge') + + +@unittest.skipUnless(HAS_BRANCHING, 'netbox-branching is not installed') +class SequentialRenameSquashTestCase(SequentialRenameTestCase, TransactionTestCase): + """Run SequentialRenameTestCase with the squash merge strategy.""" + MERGE_STRATEGY = 'squash' + + def test_sequential_renames_alpha_beta_gamma_merge(self): + self._run_sequential_rename_merge('seq_squash_cot', 'seq-squash-cot') diff --git a/netbox_custom_objects/tests/test_deletion.py b/netbox_custom_objects/tests/test_deletion.py index 047aee40..4dcf5f8d 100644 --- a/netbox_custom_objects/tests/test_deletion.py +++ b/netbox_custom_objects/tests/test_deletion.py @@ -21,21 +21,9 @@ class DeletionTestCase(TransactionCleanupMixin, CustomObjectsTestCase, Transacti """Test deletion scenarios with cascading effects.""" def setUp(self): - """Purge stale dynamic models left in the app registry by earlier TestCase - classes whose setUpTestData transaction was rolled back (dropping the backing - tables). Leaving them registered causes Django's cascade-delete collector to - query non-existent tables when a related core object is deleted. - """ + # TransactionCleanupMixin.setUp() purges stale generated models and + # CustomObjectsTestCase.setUp() creates the test user and client. super().setUp() - stale = [ - name - for name, model in list(django_apps.all_models.get(APP_LABEL, {}).items()) - if getattr(model, '_generated_table_model', False) - ] - for name in stale: - django_apps.all_models[APP_LABEL].pop(name, None) - if stale: - django_apps.clear_cache() # ------------------------------------------------------------------ # Helpers diff --git a/netbox_custom_objects/tests/test_field_types.py b/netbox_custom_objects/tests/test_field_types.py index 4dff11da..c23c0f9e 100644 --- a/netbox_custom_objects/tests/test_field_types.py +++ b/netbox_custom_objects/tests/test_field_types.py @@ -356,11 +356,11 @@ def test_datetime_field_creation(self): name="created_datetime", label="Created DateTime", type="datetime", - default="2023-01-01T12:00:00" + default="2023-01-01T12:00:00+00:00" ) self.assertEqual(field.type, "datetime") - self.assertEqual(field.default, "2023-01-01T12:00:00") + self.assertEqual(field.default, "2023-01-01T12:00:00+00:00") def test_datetime_field_validation(self): """Test datetime field validation.""" @@ -390,8 +390,9 @@ def test_datetime_field_model_generation(self): type="datetime" ) + import datetime as dt model = self.custom_object_type.get_model() - test_datetime = datetime(2023, 1, 1, 12, 0, 0) + test_datetime = datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc) instance = model.objects.create(name="Test", created_datetime=test_datetime) self.assertEqual(instance.created_datetime, test_datetime) diff --git a/netbox_custom_objects/tests/test_filtersets.py b/netbox_custom_objects/tests/test_filtersets.py index 2965c5c9..b33ed65f 100644 --- a/netbox_custom_objects/tests/test_filtersets.py +++ b/netbox_custom_objects/tests/test_filtersets.py @@ -573,8 +573,9 @@ def setUpTestData(cls): ) model = cls.cot.get_model() - cls.obj_morning = model.objects.create(ts=datetime.datetime(2025, 3, 10, 9, 0, 0)) - cls.obj_evening = model.objects.create(ts=datetime.datetime(2025, 3, 10, 18, 30, 0)) + utc = datetime.timezone.utc + cls.obj_morning = model.objects.create(ts=datetime.datetime(2025, 3, 10, 9, 0, 0, tzinfo=utc)) + cls.obj_evening = model.objects.create(ts=datetime.datetime(2025, 3, 10, 18, 30, 0, tzinfo=utc)) def _search(self, value): model = self.cot.get_model() diff --git a/netbox_custom_objects/utilities.py b/netbox_custom_objects/utilities.py index 432b90bb..ff98c0b1 100644 --- a/netbox_custom_objects/utilities.py +++ b/netbox_custom_objects/utilities.py @@ -177,6 +177,15 @@ def get_viewname(model, action=None, rest_api=False): return viewname +def is_in_branch() -> bool: + """Return True if the current request is executing inside a branch context.""" + try: + from netbox_branching.contextvars import active_branch + return active_branch.get() is not None + except ImportError: + return False + + def generate_model(*args, **kwargs): """ Create a model. @@ -194,18 +203,3 @@ def generate_model(*args, **kwargs): model = type(*args, **kwargs) return model - - -def is_in_branch(): - """ - Check if currently operating within a branch. - - Returns: - bool: True if currently in a branch, False otherwise. - """ - try: - from netbox_branching.contextvars import active_branch - return active_branch.get() is not None - except ImportError: - # Branching plugin not installed - return False diff --git a/netbox_custom_objects/views.py b/netbox_custom_objects/views.py index 352238cd..4af7494e 100644 --- a/netbox_custom_objects/views.py +++ b/netbox_custom_objects/views.py @@ -29,7 +29,7 @@ from extras.choices import CustomFieldTypeChoices from netbox_custom_objects.constants import APP_LABEL from netbox_custom_objects.dynamic_forms import build_filterset_form_class -from netbox_custom_objects.utilities import extract_cot_id_from_model_name, is_in_branch +from netbox_custom_objects.utilities import extract_cot_id_from_model_name logger = logging.getLogger("netbox_custom_objects.views") @@ -591,11 +591,6 @@ def custom_save(self, commit=True): return form_class - def get_extra_context(self, request, obj): - return { - 'branch_warning': is_in_branch(), - } - @register_model_view(CustomObject, "delete") class CustomObjectDeleteView(generic.ObjectDeleteView): @@ -705,11 +700,6 @@ def get_form(self, queryset): return form - def get_extra_context(self, request): - return { - 'branch_warning': is_in_branch(), - } - @register_model_view(CustomObject, "bulk_delete", path="delete", detail=False) class CustomObjectBulkDeleteView(CustomObjectTableMixin, generic.BulkDeleteView): @@ -796,11 +786,6 @@ def get_model_form(self, queryset): return form - def get_extra_context(self, request): - return { - 'branch_warning': is_in_branch(), - } - class CustomObjectJournalView(ConditionalLoginRequiredMixin, View): """