From 2b7e2bdb8364ede3616199c68902e62029005eae Mon Sep 17 00:00:00 2001 From: Johnathan Kupferer Date: Wed, 10 Sep 2025 17:04:59 -0400 Subject: [PATCH] Change to use label selector - Use label_selector from kopf fork - Switch to watch other resourcehandles outside of kopf management - Fix race condition in init resource handle status --- Containerfile | 3 +- operator/operator.py | 81 ++++---- operator/resourcehandle.py | 189 ++++++++++++------ .../tasks/test-pool-04.yaml | 2 +- 4 files changed, 165 insertions(+), 110 deletions(-) diff --git a/Containerfile b/Containerfile index 05e6363..cf43ef0 100644 --- a/Containerfile +++ b/Containerfile @@ -7,7 +7,8 @@ COPY . /tmp/src RUN rm -rf /tmp/src/.git* && \ chown -R 1001 /tmp/src && \ chgrp -R 0 /tmp/src && \ - chmod -R g+w /tmp/src + chmod -R g+w /tmp/src && \ + pip install git+https://github.com/rhpds/kopf.git@add-label-selector USER 1001 diff --git a/operator/operator.py b/operator/operator.py index 01174f3..fbdd366 100755 --- a/operator/operator.py +++ b/operator/operator.py @@ -55,11 +55,13 @@ async def startup(logger: kopf.ObjectLogger, settings: kopf.OperatorSettings, ** # Preload for matching ResourceClaim templates if Poolboy.operator_mode_all_in_one or Poolboy.operator_mode_resource_handler: await ResourceHandle.preload(logger=logger) - - + if Poolboy.operator_mode_resource_handler: + ResourceHandle.start_watch_other() @kopf.on.cleanup() async def cleanup(logger: kopf.ObjectLogger, **_): + if Poolboy.operator_mode_resource_handler: + ResourceHandle.stop_watch_other() await ResourceWatch.stop_all() await Poolboy.on_cleanup() @@ -71,18 +73,16 @@ async def resource_provider_event(event: Mapping, logger: kopf.ObjectLogger, **_ else: await ResourceProvider.register(definition=definition, logger=logger) -labels_arg = { - Poolboy.ignore_label: kopf.ABSENT, -} +label_selector = f"!{Poolboy.ignore_label}" if Poolboy.operator_mode_resource_handler: - labels_arg[Poolboy.resource_handler_idx_label] = str(Poolboy.resource_handler_idx) + label_selector += f",{Poolboy.resource_handler_idx_label}={Poolboy.resource_handler_idx}" if Poolboy.operator_mode_manager: # In manager mode just label ResourceClaims, ResourceHandles, and ResourcePools # to assign the correct handler. @kopf.on.event( ResourceClaim.api_group, ResourceClaim.api_version, ResourceClaim.plural, - labels=labels_arg, + label_selector=label_selector, ) async def label_resource_claim( event: Mapping, @@ -97,7 +97,7 @@ async def label_resource_claim( @kopf.on.event( ResourceHandle.api_group, ResourceHandle.api_version, ResourceHandle.plural, - labels=labels_arg, + label_selector=label_selector, ) async def label_resource_handle( event: Mapping, @@ -112,7 +112,7 @@ async def label_resource_handle( @kopf.on.event( ResourcePool.api_group, ResourcePool.api_version, ResourcePool.plural, - labels=labels_arg, + label_selector=label_selector, ) async def label_resource_pool( event: Mapping, @@ -134,15 +134,18 @@ async def label_resource_pool( @kopf.on.create( ResourceClaim.api_group, ResourceClaim.api_version, ResourceClaim.plural, - id='resource_claim_create', labels=labels_arg, + label_selector=label_selector, + id='resource_claim_create', ) @kopf.on.resume( ResourceClaim.api_group, ResourceClaim.api_version, ResourceClaim.plural, - id='resource_claim_resume', labels=labels_arg, + label_selector=label_selector, + id='resource_claim_resume', ) @kopf.on.update( ResourceClaim.api_group, ResourceClaim.api_version, ResourceClaim.plural, - id='resource_claim_update', labels=labels_arg, + label_selector=label_selector, + id='resource_claim_update', ) async def resource_claim_event( annotations: kopf.Annotations, @@ -170,7 +173,7 @@ async def resource_claim_event( @kopf.on.delete( ResourceClaim.api_group, ResourceClaim.api_version, ResourceClaim.plural, - labels=labels_arg, + label_selector=label_selector, ) async def resource_claim_delete( annotations: kopf.Annotations, @@ -201,7 +204,7 @@ async def resource_claim_delete( ResourceClaim.api_group, ResourceClaim.api_version, ResourceClaim.plural, cancellation_timeout = 1, initial_delay = Poolboy.manage_handles_interval, - labels = labels_arg, + label_selector=label_selector, ) async def resource_claim_daemon( annotations: kopf.Annotations, @@ -239,35 +242,20 @@ async def resource_claim_daemon( except asyncio.CancelledError: pass - if Poolboy.operator_mode_resource_handler: - @kopf.on.event( - ResourceHandle.api_group, ResourceHandle.api_version, ResourceHandle.plural, - id='resource_handle_register', - labels={Poolboy.ignore_label: kopf.ABSENT}, - ) - async def resource_handle_register( - event: Mapping, - logger: kopf.ObjectLogger, - **_ - ) -> None: - """Track ResourceHandles managed by other handlers.""" - definition = event['object'] - if event['type'] == 'DELETED': - await ResourceHandle.unregister(name=definition['metadata']['name'], logger=logger) - else: - await ResourceHandle.register_definition(definition=definition) - @kopf.on.create( ResourceHandle.api_group, ResourceHandle.api_version, ResourceHandle.plural, - id='resource_handle_create', labels=labels_arg, + id='resource_handle_create', + label_selector=label_selector, ) @kopf.on.resume( ResourceHandle.api_group, ResourceHandle.api_version, ResourceHandle.plural, - id='resource_handle_resume', labels=labels_arg, + id='resource_handle_resume', + label_selector=label_selector, ) @kopf.on.update( ResourceHandle.api_group, ResourceHandle.api_version, ResourceHandle.plural, - id='resource_handle_update', labels=labels_arg, + id='resource_handle_update', + label_selector=label_selector, ) async def resource_handle_event( annotations: kopf.Annotations, @@ -297,7 +285,7 @@ async def resource_handle_event( @kopf.on.delete( ResourceHandle.api_group, ResourceHandle.api_version, ResourceHandle.plural, - labels=labels_arg, + label_selector=label_selector, ) async def resource_handle_delete( annotations: kopf.Annotations, @@ -330,7 +318,7 @@ async def resource_handle_delete( ResourceHandle.api_group, ResourceHandle.api_version, ResourceHandle.plural, cancellation_timeout = 1, initial_delay = Poolboy.manage_handles_interval, - labels = labels_arg, + label_selector=label_selector, ) async def resource_handle_daemon( annotations: kopf.Annotations, @@ -370,15 +358,18 @@ async def resource_handle_daemon( @kopf.on.create( ResourcePool.api_group, ResourcePool.api_version, ResourcePool.plural, - id='resource_pool_create', labels=labels_arg, + id='resource_pool_create', + label_selector=label_selector, ) @kopf.on.resume( ResourcePool.api_group, ResourcePool.api_version, ResourcePool.plural, - id='resource_pool_resume', labels=labels_arg, + id='resource_pool_resume', + label_selector=label_selector, ) @kopf.on.update( ResourcePool.api_group, ResourcePool.api_version, ResourcePool.plural, - id='resource_pool_update', labels=labels_arg, + id='resource_pool_update', + label_selector=label_selector, ) async def resource_pool_event( annotations: kopf.Annotations, @@ -406,7 +397,7 @@ async def resource_pool_event( @kopf.on.delete( Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools', - labels=labels_arg, + label_selector=label_selector, ) async def resource_pool_delete( annotations: kopf.Annotations, @@ -436,7 +427,7 @@ async def resource_pool_delete( @kopf.daemon(Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools', cancellation_timeout = 1, initial_delay = Poolboy.manage_pools_interval, - labels = labels_arg, + label_selector=label_selector, ) async def resource_pool_daemon( annotations: kopf.Annotations, @@ -477,11 +468,13 @@ async def resource_pool_daemon( ): @kopf.on.create( Poolboy.operator_domain, Poolboy.operator_version, 'resourcewatches', - id='resource_watch_create', labels=labels_arg, + id='resource_watch_create', + label_selector=label_selector, ) @kopf.on.resume( Poolboy.operator_domain, Poolboy.operator_version, 'resourcewatches', - id='resource_watch_resume', labels=labels_arg, + id='resource_watch_resume', + label_selector=label_selector, ) async def resource_watch_create_or_resume( annotations: kopf.Annotations, diff --git a/operator/resourcehandle.py b/operator/resourcehandle.py index 3ea196c..d3f7517 100644 --- a/operator/resourcehandle.py +++ b/operator/resourcehandle.py @@ -1,4 +1,5 @@ import asyncio +import logging from copy import deepcopy from datetime import datetime, timedelta, timezone @@ -81,6 +82,7 @@ class ResourceHandle(KopfObject): bound_instances = {} unbound_instances = {} class_lock = asyncio.Lock() + watch_other_task = None @classmethod def __register_definition(cls, definition: Mapping) -> ResourceHandleT: @@ -520,6 +522,11 @@ async def get(cls, name: str, ignore_deleting=True, use_cache=True) -> ResourceH def get_from_cache(cls, name: str) -> ResourceHandleT|None: return cls.all_instances.get(name) + @classmethod + def start_watch_other(cls) -> None: + logger = logging.getLogger('watch_other_handles') + cls.watch_other_task = asyncio.create_task(cls.watch_other(logger)) + @classmethod async def get_unbound_handles_for_pool( cls, @@ -616,6 +623,13 @@ async def register_definition(cls, definition: Mapping) -> ResourceHandleT: async with cls.class_lock: return cls.__register_definition(definition) + @classmethod + async def stop_watch_other(cls) -> None: + if cls.watch_other_task is None: + return + cls.watch_other_task.cancel() + await cls.watch_other_task + @classmethod async def unregister(cls, name: str) -> ResourceHandleT|None: async with cls.class_lock: @@ -624,6 +638,40 @@ async def unregister(cls, name: str) -> ResourceHandleT|None: resource_handle.__unregister() return resource_handle + @classmethod + async def watch_other(cls, logger) -> None: + while True: + try: + # FIXME - clear stale cache entries + await cls.__watch_other(logger) + except kubernetes_asyncio.client.exceptions.ApiException as exception: + if exception.status != 410: + logger.exception("Error watching other resourcehandles") + await asyncio.sleep(10) + except: + logger.exception("Error watching other resourcehandles") + await asyncio.sleep(10) + + @classmethod + async def __watch_other(cls, logger) -> None: + logger.info("HERE") + watch = kubernetes_asyncio.watch.Watch() + async for event in watch.stream( + Poolboy.custom_objects_api.list_namespaced_custom_object, + group=cls.api_group, + label_selector=f"!{Poolboy.ignore_label},{Poolboy.resource_handler_idx_label}!={Poolboy.resource_handler_idx}", + namespace=Poolboy.namespace, + plural=cls.plural, + version=cls.api_version, + ): + logger.info(f"EVENT {event}") + event_obj = event['object'] + event_type = event['type'] + if event_type == 'DELETED': + await cls.unregister(event_obj['metadata']['name']) + else: + await cls.register_definition(event_obj) + def __str__(self) -> str: return f"ResourceHandle {self.name}" @@ -833,44 +881,63 @@ def __lifespan_value_as_timedelta(self, raise kopf.TemporaryError(f"Failed to parse {name} time interval: {value}", delay=60) return timedelta(seconds=seconds) - async def __manage_init_status_resources(self) -> None: + async def __manage_init_status_resources(self, + logger: kopf.ObjectLogger, + ) -> None: """Initialize resources in status from spec.""" - patch = [] - if not self.status: - patch.append({ - "op": "add", - "path": "/status", - "value": {}, - }) - status_resources = self.status.get('resources', []) - if 'resources' not in self.status: - patch.append({ - "op": "add", - "path": "/status/resources", - "value": [], - }) - for idx, resource in enumerate(self.resources): - if idx < len(status_resources): - status_resource = status_resources[idx] - else: - status_resource = {} - status_resources.append(status_resource) - patch.append({ - "op": "add", - "path": f"/status/resources/{idx}", - "value": {}, - }) - - if 'name' in resource and resource['name'] != status_resource.get('name'): - status_resource['name'] = resource['name'] - patch.append({ - "op": "add", - "path": f"/status/resources/{idx}/name", - "value": resource['name'], - }) - - if patch: - await self.json_patch_status(patch) + attempt = 0 + while True: + try: + set_resources = [] + for idx, resource in enumerate(self.resources): + if idx < len(self.status_resources): + entry = deepcopy(self.status_resources[idx]) + else: + entry = {} + if 'name' in resource and resource['name'] != entry.get('name'): + entry['name'] = resource['name'] + set_resources.append(entry) + + patch = [] + if not self.status: + patch.extend(({ + "op": "test", + "path": "/status", + "value": None, + }, { + "op": "add", + "path": "/status", + "value": {}, + })) + if 'resources' not in self.status: + patch.extend(({ + "op": "test", + "path": "/status/resources", + "value": None, + }, { + "op": "add", + "path": "/status/resources", + "value": set_resources, + })) + else: + patch.extend(({ + "op": "test", + "path": "/status/resources", + "value": self.status_resources, + }, { + "op": "replace", + "path": "/status/resources", + "value": set_resources, + })) + if 0 == len(patch): + return + await self.json_patch_status(patch) + return + except kubernetes_asyncio.client.exceptions.ApiException as exception: + if attempt > 2: + logger.exception(f"{self} failed status patch: {patch}") + raise + attempt += 1 async def __manage_check_delete(self, logger: kopf.ObjectLogger, @@ -1147,7 +1214,7 @@ async def manage(self, logger: kopf.ObjectLogger) -> None: ) # Initialize status.resources - await self.__manage_init_status_resources() + await self.__manage_init_status_resources(logger=logger) # Get resource providers for managed resources resource_providers = await self.get_resource_providers() @@ -1360,6 +1427,14 @@ async def update_status(self, """Update status from resources state.""" status = self.status + while len(self.resources) < len(resource_states): + logger.warning(f"{self} update status with resource states longer that list of resources, attempting refetch: {len(self.resources)} < {len(resource_states)}") + await asyncio.sleep(0.2) + await self.refetch() + if len(self.resources) < len(resource_states): + logger.error(f"{self} update status with resource states longer that list of resources after refetch: {len(self.resources)} < {len(resource_states)}") + return + # Create consolidated information about resources resources = deepcopy(self.resources) for idx, state in enumerate(resource_states): @@ -1371,32 +1446,9 @@ async def update_status(self, have_ready_resource = False all_resources_ready = True - if not status: - patch.append({ - "op": "add", - "path": "/status", - "value": {}, - }) - status_resources = status.get('resources') - if status_resources is None: - status_resources = [] - patch.append({ - "op": "add", - "path": "/status/resources", - "value": [], - }) - + status_resources = status.get('resources', []) for idx, resource in enumerate(resources): - if idx < len(status_resources): - status_resource = status_resources[idx] - else: - status_resource = {} - patch.append({ - "op": "add", - "path": f"/status/resources/{idx}", - "value": {}, - }) - + status_resource = status_resources[idx] if idx < len(status_resources) else {} resource_healthy = None resource_ready = False @@ -1508,7 +1560,16 @@ async def update_status(self, except Exception: logger.exception(f"Failed to generate status summary for {self}") if patch: - await self.json_patch_status(patch) + while True: + try: + await self.json_patch_status(patch) + break + except kubernetes_asyncio.client.exceptions.ApiException: + patch_attempt += 1 + if patch_attempt > 5: + logger.exception(f"Failed to patch status on {self}") + return + await asyncio.sleep(0.2) if resource_claim: await resource_claim.update_status_from_handle( diff --git a/test/roles/poolboy_test_simple/tasks/test-pool-04.yaml b/test/roles/poolboy_test_simple/tasks/test-pool-04.yaml index 1acdc11..2b5a02d 100644 --- a/test/roles/poolboy_test_simple/tasks/test-pool-04.yaml +++ b/test/roles/poolboy_test_simple/tasks/test-pool-04.yaml @@ -80,7 +80,7 @@ __unbound_handles[0].status.resources[0].healthy != false until: r_get_resource_handles is success delay: 1 - retries: 10 + retries: 15 - name: Set facts from for ResourcePool test-pool-04 ResourceHandles vars: