From da19d8075370d039b8f23eae2604ca26a9769094 Mon Sep 17 00:00:00 2001 From: Vitalii <87299468+toderian@users.noreply.github.com> Date: Tue, 3 Mar 2026 15:50:33 +0200 Subject: [PATCH 01/10] Feat car dynamic env improvements (#367) * feat: add advanced dynamic env for CAR * chore: increment version * fix: move container ip extraction to a method * fix: use kwargs for dynamic env setup methods * fix: error log --- .../container_apps/container_app_runner.py | 9 ++- .../container_apps/container_utils.py | 70 +++++++++++++++++-- ver.py | 2 +- 3 files changed, 71 insertions(+), 10 deletions(-) diff --git a/extensions/business/container_apps/container_app_runner.py b/extensions/business/container_apps/container_app_runner.py index a96afc0a..f3a4af01 100644 --- a/extensions/business/container_apps/container_app_runner.py +++ b/extensions/business/container_apps/container_app_runner.py @@ -1282,7 +1282,6 @@ def on_init(self): self.reset_tunnel_engine() - self._configure_dynamic_env() # setup dynamic env vars for the container self._setup_resource_limits_and_ports() # setup container resource limits (CPU, GPU, memory, ports) self._configure_volumes() # setup container volumes self._configure_file_volumes() # setup file volumes with dynamic content @@ -2601,6 +2600,10 @@ def _setup_semaphore_env(self): if port: self.semaphore_set_env('PORT', str(port)) self.semaphore_set_env('URL', 'http://{}:{}'.format(localhost_ip, port)) + container_ip = self._get_container_ip() + self.Pd(f"Container IP address: {container_ip}") + if container_ip: + self.semaphore_set_env('CONTAINER_IP', container_ip) return @@ -3094,7 +3097,6 @@ def _restart_container(self, stop_reason=None): # Set state after reset self._set_container_state(ContainerState.RESTARTING, stop_reason or StopReason.UNKNOWN) - self._configure_dynamic_env() self._setup_resource_limits_and_ports() self._configure_volumes() self._configure_file_volumes() @@ -3278,7 +3280,8 @@ def _handle_initial_launch(self): if not self._wait_for_semaphores(): return # Still # end if - # Semaphores ready - now setup env vars with semaphore values + # Semaphores ready - dynamic env to resolve shmem values, then setup env + self._configure_dynamic_env() self._setup_env_and_ports() # end if diff --git a/extensions/business/container_apps/container_utils.py b/extensions/business/container_apps/container_utils.py index 59d82e16..a54e8e9d 100644 --- a/extensions/business/container_apps/container_utils.py +++ b/extensions/business/container_apps/container_utils.py @@ -89,6 +89,8 @@ def _get_default_env_vars(self): "R1EN_R1FS_API_URL": f"http://{localhost_ip}:31235", "EE_CHAINSTORE_PEERS": str_chainstore_peers, "R1EN_CHAINSTORE_PEERS": str_chainstore_peers, + + "R1EN_CONTAINER_IP": self._get_container_ip() # OBSERVATION: From now on only add new env vars with R1EN_ prefix # to avoid missunderstandings with EE_ prefixed vars that @@ -138,7 +140,30 @@ def _get_chainstore_response_data(self): return data - def _setup_dynamic_env_var_host_ip(self): + def _get_container_ip(self): + """ + Get the container's IP address from Docker network settings. + + If a semaphore is configured, also sets the CONTAINER_IP env var + in shared memory for paired plugins. + + Returns + ------- + str or None + Container IP address, or None if unavailable + """ + if not self.container: + return None + try: + self.container.reload() + container_ip = self.container.attrs['NetworkSettings']['IPAddress'] + return container_ip + except Exception as e: + self.P(f"Could not get container IP: {e}", color='r') + return None + + + def _setup_dynamic_env_var_host_ip(self, **kwargs): """ Get host IP address for dynamic environment variable. @@ -149,7 +174,36 @@ def _setup_dynamic_env_var_host_ip(self): """ return self.log.get_localhost_ip() - def _setup_dynamic_env_var_some_other_calc_type(self): + + def _setup_dynamic_env_var_shmem(self, path, **kwargs): + """ + Get a value from shared memory semaphore environment. + + Parameters + ---------- + path : list or str + If list: [semaphore_key, env_key] - joined with '_' to form full key + If str: the full prefixed key to look up directly + + Returns + ------- + str + The semaphore env value, or empty string if not found + """ + if isinstance(path, list): + if len(path) != 2: + self.P(f"Invalid shmem path: {path}. Expected [semaphore_key, env_key]", color='r') + return "" + value = self.semaphore_get_env_value(path[0], path[1]) + elif isinstance(path, str): + value = self.semaphore_get_env_value_by_path(path) + else: + self.P(f"Invalid shmem path type: {type(path)}. Expected list or str", color='r') + return "" + return str(value) if value else "" + + + def _setup_dynamic_env_var_some_other_calc_type(self, **kwargs): """ Example dynamic environment variable calculator. @@ -206,11 +260,15 @@ def _configure_dynamic_env(self): if callable(func): # Call the function and append its result to the variable value try: - candidate_value = func() + part_kwargs = {k: v for k, v in variable_part.items() if k != 'type'} + candidate_value = func(**part_kwargs) found = True - except: - self.P(f"Error calling function {func_name} for dynamic env var {variable_name}", color='r') - #endif callable + except Exception as e: + self.P( + "Error calling dynamic env var function {} for variable {}: {}".format( + func_name, variable_name, e), + color='r') + # endif callable #endif hasattr if not found: self.P(f"Dynamic env var {variable_name} has invalid type: {part_type}", color='r') diff --git a/ver.py b/ver.py index 767bdb5f..87547e9f 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.80' +__VER__ = '2.10.81' From 9a913a72f1eaa3c3cf6703bd882c0cf022d9fc9a Mon Sep 17 00:00:00 2001 From: toderian Date: Tue, 3 Mar 2026 17:19:11 +0200 Subject: [PATCH 02/10] fix: wrap liveness api services statuses check in try except --- extensions/business/liveness/liveness_api.py | 14 +++++++++----- ver.py | 2 +- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/extensions/business/liveness/liveness_api.py b/extensions/business/liveness/liveness_api.py index 742cf30b..87f13172 100644 --- a/extensions/business/liveness/liveness_api.py +++ b/extensions/business/liveness/liveness_api.py @@ -200,11 +200,15 @@ def __get_service_status(self, service_config): def __get_all_services_statuses(self): - statuses = {} - for service in self.cfg_monitored_services.keys(): - cfg = self.__get_service_config(service) - statuses[service] = self.__get_service_status(cfg) - return statuses + try: + statuses = {} + for service in self.cfg_monitored_services.keys(): + cfg = self.__get_service_config(service) + statuses[service] = self.__get_service_status(cfg) + return statuses + except Exception as e: + self.P("Error while getting services statuses: {}".format(str(e))) + return {} @BasePlugin.endpoint # /get_liveness_data diff --git a/ver.py b/ver.py index 87547e9f..355b0121 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.81' +__VER__ = '2.10.82' From d111a6c318904e162bba0b6e1e59ef43071d37a4 Mon Sep 17 00:00:00 2001 From: toderian Date: Wed, 4 Mar 2026 10:56:19 +0200 Subject: [PATCH 03/10] fix: semaphored keys setup on shmem dynamic env dependencies --- extensions/business/deeploy/deeploy_const.py | 4 + .../business/deeploy/deeploy_manager_api.py | 3 +- extensions/business/deeploy/deeploy_mixin.py | 304 ++++++++++++++---- 3 files changed, 247 insertions(+), 64 deletions(-) diff --git a/extensions/business/deeploy/deeploy_const.py b/extensions/business/deeploy/deeploy_const.py index 69ae69fd..44f8ca84 100644 --- a/extensions/business/deeploy/deeploy_const.py +++ b/extensions/business/deeploy/deeploy_const.py @@ -52,6 +52,8 @@ class DEEPLOY_KEYS: PLUGINS = "plugins" PLUGIN_INSTANCES = "instances" PLUGIN_INSTANCE_ID = "instance_id" + PLUGIN_NAME = "plugin_name" + DEPENDENCY_TREE = "dependency_tree" # Auth result keys SENDER = "sender" SENDER_ESCROW = "sender_escrow" @@ -104,6 +106,8 @@ class DEEPLOY_ERRORS: PLUGINS1 = "ERR25_DEEPLOY_PLUGINS1" # Invalid plugins array structure PLUGINS2 = "ERR26_DEEPLOY_PLUGINS2" # Plugin missing required field PLUGINS3 = "ERR27_DEEPLOY_PLUGINS3" # Instance validation failed + DEPENDENCY_TREE1 = "ERR28_DEEPLOY_DEPENDENCY_TREE1" # Invalid dependency_tree entry + DEPENDENCY_TREE2 = "ERR29_DEEPLOY_DEPENDENCY_TREE2" # Circular dependency detected class DEEPLOY_RESOURCES: # Result dictionary keys diff --git a/extensions/business/deeploy/deeploy_manager_api.py b/extensions/business/deeploy/deeploy_manager_api.py index 9ca53bfd..063fee34 100644 --- a/extensions/business/deeploy/deeploy_manager_api.py +++ b/extensions/business/deeploy/deeploy_manager_api.py @@ -255,7 +255,8 @@ def _process_pipeline_request( if job_app_type not in JOB_APP_TYPES_ALL: raise ValueError(f"Invalid job_app_type '{job_app_type}'. Expected one of {JOB_APP_TYPES_ALL}.") else: - job_app_type = self.deeploy_detect_job_app_type(self.deeploy_prepare_plugins(inputs)) + plugins_for_detection, _ = self.deeploy_prepare_plugins(inputs) + job_app_type = self.deeploy_detect_job_app_type(plugins_for_detection) if job_app_type not in JOB_APP_TYPES_ALL: job_app_type = JOB_APP_TYPES.NATIVE self.P(f"Detected job app type: {job_app_type}") diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 9eac1679..75b19a95 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -100,7 +100,10 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, """ Create new pipelines on each node and set CSTORE `response_key` for the "callback" action """ - plugins = self.deeploy_prepare_plugins(inputs) + plugins, name_to_instance = self.deeploy_prepare_plugins(inputs) + if name_to_instance: + plugins = self._resolve_shmem_references(plugins, name_to_instance, app_id) + self._validate_dependency_tree(inputs) plugins = self._ensure_runner_cstore_auth_env( app_id=app_id, prepared_plugins=plugins, @@ -250,7 +253,7 @@ def __update_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, ts = self.time() detected_job_app_type = job_app_type if not detected_job_app_type: - plugins_for_detection = self.deeploy_prepare_plugins(inputs) + plugins_for_detection, _ = self.deeploy_prepare_plugins(inputs) detected_job_app_type = self.deeploy_detect_job_app_type(plugins_for_detection) if not dct_deeploy_specs: @@ -1331,7 +1334,7 @@ def deeploy_check_payment_and_job_owner(self, inputs, owner, is_create, debug=Fa if not job_app_type: try: self.Pd(" Detecting job app type from plugins...") - job_app_type = self.deeploy_detect_job_app_type(self.deeploy_prepare_plugins(inputs)) + job_app_type = self.deeploy_detect_job_app_type(self.deeploy_prepare_plugins(inputs)[0]) self.Pd(f" Detected job app type: {job_app_type}") except Exception as exc: self.Pd(f" Failed to detect job app type: {exc}") @@ -1454,35 +1457,209 @@ def extract_instance_confs(instances): signature, instances = normalized_plugins[0] normalized_signature = signature.upper() if isinstance(signature, str) else '' - if normalized_signature == CONTAINER_APP_RUNNER_SIGNATURE: - service_keywords = ('postgresql', 'postgres', 'mongo', 'mongodb', 'mysql', 'mssql') - for instance_conf in instances: - if not isinstance(instance_conf, dict): - continue - image_value = ( - instance_conf.get(DEEPLOY_KEYS.APP_PARAMS_IMAGE) - or instance_conf.get('IMAGE') - or instance_conf.get('image') - ) - if image_value and any(keyword in str(image_value).lower() for keyword in service_keywords): - return JOB_APP_TYPES.SERVICE - return JOB_APP_TYPES.GENERIC + if normalized_signature in (CONTAINER_APP_RUNNER_SIGNATURE, WORKER_APP_RUNNER_SIGNATURE): + # Multiple instances under one containerized signature → native multi-container + if len(instances) > 1: + return JOB_APP_TYPES.NATIVE + + if normalized_signature == CONTAINER_APP_RUNNER_SIGNATURE: + service_keywords = ('postgresql', 'postgres', 'mongo', 'mongodb', 'mysql', 'mssql') + for instance_conf in instances: + if not isinstance(instance_conf, dict): + continue + image_value = ( + instance_conf.get(DEEPLOY_KEYS.APP_PARAMS_IMAGE) + or instance_conf.get('IMAGE') + or instance_conf.get('image') + ) + if image_value and any(keyword in str(image_value).lower() for keyword in service_keywords): + return JOB_APP_TYPES.SERVICE - if normalized_signature == WORKER_APP_RUNNER_SIGNATURE: return JOB_APP_TYPES.GENERIC return JOB_APP_TYPES.NATIVE + def _has_shmem_dynamic_env(self, plugins): + """Check if any plugin instance has shmem-type DYNAMIC_ENV entries.""" + for plugin in plugins: + instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + if not isinstance(instances, list): + continue + for instance in instances: + if not isinstance(instance, dict): + continue + dynamic_env = instance.get("DYNAMIC_ENV") + if not isinstance(dynamic_env, dict): + continue + for _key, entries in dynamic_env.items(): + if not isinstance(entries, list): + continue + for entry in entries: + if isinstance(entry, dict) and entry.get("type") == "shmem": + return True + return False + + def _resolve_shmem_references(self, plugins, name_to_instance, app_id): + """ + Resolve shmem-type DYNAMIC_ENV entries by replacing plugin names with semaphore keys. + Sets SEMAPHORE on provider instances and SEMAPHORED_KEYS on consumer instances. + + Parameters + ---------- + plugins : list + Prepared plugins payload. + name_to_instance : dict + Mapping of plugin_name -> { instance_id, signature }. + app_id : str + Application identifier used to build semaphore keys. + + Returns + ------- + list + Modified plugins with shmem references resolved. + """ + try: + # Build name -> semaphore_key map + name_to_semaphore = {} + for pname, info in name_to_instance.items(): + iid = info.get("instance_id", "") + name_to_semaphore[pname] = self.sanitize_name("{}__{}".format(app_id, iid)) + + # Build instance_id -> instance reference for quick lookup + instance_id_to_ref = {} + for plugin in plugins: + instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + for instance in instances: + if isinstance(instance, dict): + iid = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) + if iid: + instance_id_to_ref[iid] = instance + + providers = set() # instance_ids that are referenced as providers + consumers = {} # instance_id -> set of semaphore keys they depend on + + # Scan all instances for shmem DYNAMIC_ENV entries + for plugin in plugins: + instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + for instance in instances: + if not isinstance(instance, dict): + continue + consumer_iid = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) + dynamic_env = instance.get("DYNAMIC_ENV") + if not isinstance(dynamic_env, dict): + continue + + for _key, entries in dynamic_env.items(): + if not isinstance(entries, list): + continue + for entry in entries: + if not isinstance(entry, dict) or entry.get("type") != "shmem": + continue + path = entry.get("path") + if not isinstance(path, (list, tuple)) or len(path) < 2: + continue + provider_name = path[0] + if provider_name not in name_to_semaphore: + self.Pd(f"Warning: shmem references unknown plugin '{provider_name}'", color='y') + continue + + semaphore_key = name_to_semaphore[provider_name] + # Replace plugin name in path with the semaphore key + entry["path"] = [semaphore_key, path[1]] + + # Track provider and consumer + provider_info = name_to_instance.get(provider_name, {}) + provider_iid = provider_info.get("instance_id") + if provider_iid: + providers.add(provider_iid) + if consumer_iid: + if consumer_iid not in consumers: + consumers[consumer_iid] = set() + consumers[consumer_iid].add(semaphore_key) + + # Set SEMAPHORE on each provider instance + for provider_iid in providers: + ref = instance_id_to_ref.get(provider_iid) + if ref is not None: + sem_key = self.sanitize_name("{}__{}".format(app_id, provider_iid)) + ref["SEMAPHORE"] = sem_key + + # Set SEMAPHORED_KEYS on each consumer instance + for consumer_iid, sem_keys in consumers.items(): + ref = instance_id_to_ref.get(consumer_iid) + if ref is not None: + ref["SEMAPHORED_KEYS"] = list(sem_keys) + + return plugins + except Exception as exc: + self.Pd(f"Failed to resolve shmem references: {exc}", color='y') + return plugins + + def _validate_dependency_tree(self, inputs): + """ + Validate dependency_tree from inputs for circular dependencies. + Raises ValueError if a cycle is detected. + """ + dep_tree = inputs.get(DEEPLOY_KEYS.DEPENDENCY_TREE) + if not dep_tree or not isinstance(dep_tree, list): + return + + # Validate structure + for edge in dep_tree: + if not isinstance(edge, (list, tuple)) or len(edge) != 2: + raise ValueError( + f"{DEEPLOY_ERRORS.DEPENDENCY_TREE1}: Invalid dependency_tree entry: {edge}. Each entry must be a [from, to] pair." + ) + + # DFS cycle detection + graph = {} + for from_node, to_node in dep_tree: + if from_node not in graph: + graph[from_node] = [] + graph[from_node].append(to_node) + + WHITE, GRAY, BLACK = 0, 1, 2 + color = {node: WHITE for node in graph} + + def dfs(node): + color[node] = GRAY + for neighbor in graph.get(node, []): + if neighbor not in color: + color[neighbor] = WHITE + c = color[neighbor] + if c == GRAY: + return True + if c == WHITE and dfs(neighbor): + return True + color[node] = BLACK + return False + + for node in list(graph.keys()): + if color.get(node, WHITE) == WHITE: + if dfs(node): + raise ValueError( + f"{DEEPLOY_ERRORS.DEPENDENCY_TREE2}: Circular dependency detected in dependency_tree." + ) + return + def _autowire_native_container_semaphore(self, app_id, plugins, job_app_type): """ - Auto-configure semaphore settings for native + container pairs. + Auto-configure semaphore settings for native + container plugin groups. + Supports N plugins: all native plugins get SEMAPHORE, all container plugins + get SEMAPHORED_KEYS pointing to all native semaphore keys. + + Skips autowiring if: + - Not a native job app type + - Less than 2 plugins + - Semaphore config already exists (e.g., from explicit shmem wiring) + - Explicit shmem DYNAMIC_ENV entries detected Parameters ---------- app_id : str Application identifier used to build deterministic semaphore keys. plugins : list - Prepared plugins payload (expected to be a two-item native/container pair). + Prepared plugins payload. job_app_type : str Detected job application type. @@ -1494,85 +1671,76 @@ def _autowire_native_container_semaphore(self, app_id, plugins, job_app_type): try: if job_app_type != JOB_APP_TYPES.NATIVE: return plugins - # endif native job app type - if not isinstance(plugins, list) or len(plugins) != 2: + if not isinstance(plugins, list) or len(plugins) < 2: return plugins - # endif plugin pair check def has_semaphore_config(plugin_list): for plugin in plugin_list: instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] if not isinstance(instances, list): continue - # endif instances is list for instance in instances: if not isinstance(instance, dict): continue - # endif instance is dict if "SEMAPHORE" in instance or "SEMAPHORED_KEYS" in instance: return True - # endif instance has semaphore config - # endfor each instance - # endfor each plugin return False if has_semaphore_config(plugins): self.Pd("Skipping semaphore autowire; semaphore config already provided.") return plugins - # endif skip when provided - container_plugin = None - native_plugin = None + # Skip autowiring if explicit shmem references are present + if self._has_shmem_dynamic_env(plugins): + self.Pd("Skipping semaphore autowire; explicit shmem references detected.") + return plugins + + # Collect all native and container plugins + native_plugins = [] + container_plugins = [] for plugin in plugins: signature = plugin.get(self.ct.CONFIG_PLUGIN.K_SIGNATURE) if not signature: continue - # endif signature check normalized_signature = str(signature).upper() if normalized_signature in CONTAINERIZED_APPS_SIGNATURES: - container_plugin = container_plugin or plugin + container_plugins.append(plugin) else: - native_plugin = native_plugin or plugin - # endif signature type - # endfor each plugin - - if not container_plugin or not native_plugin: - return plugins - # endif both plugin types found + native_plugins.append(plugin) - native_instances = native_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] - container_instances = container_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] - - if not isinstance(native_instances, list) or not isinstance(container_instances, list): + if not container_plugins or not native_plugins: return plugins - # endif instance lists + # Set SEMAPHORE on all native instances semaphore_keys = [] - for instance in native_instances: - if not isinstance(instance, dict): - continue - # endif instance is dict - instance_id = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) - if not instance_id: + for native_plugin in native_plugins: + native_instances = native_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + if not isinstance(native_instances, list): continue - # endif instance id - semaphore_key = self.sanitize_name("{}__{}".format(app_id, instance_id)) - instance["SEMAPHORE"] = semaphore_key - semaphore_keys.append(semaphore_key) - # endfor each native instance + for instance in native_instances: + if not isinstance(instance, dict): + continue + instance_id = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) + if not instance_id: + continue + semaphore_key = self.sanitize_name("{}__{}".format(app_id, instance_id)) + instance["SEMAPHORE"] = semaphore_key + semaphore_keys.append(semaphore_key) if not semaphore_keys: return plugins - # endif semaphore keys found - for instance in container_instances: - if not isinstance(instance, dict): + # Set SEMAPHORED_KEYS on all container instances + for container_plugin in container_plugins: + container_instances = container_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + if not isinstance(container_instances, list): continue - # endif container instance dict - instance["SEMAPHORED_KEYS"] = list(semaphore_keys) - # endfor each container instance + for instance in container_instances: + if not isinstance(instance, dict): + continue + instance["SEMAPHORED_KEYS"] = list(semaphore_keys) return plugins except Exception as exc: @@ -1727,11 +1895,13 @@ def deeploy_prepare_plugins(self, inputs): if plugins_array and isinstance(plugins_array, list): # Group plugin instances by signature plugins_by_signature = {} + name_to_instance = {} used_instance_ids = set() for plugin_instance in plugins_array: signature = plugin_instance.get(DEEPLOY_KEYS.PLUGIN_SIGNATURE) + plugin_name = plugin_instance.get(DEEPLOY_KEYS.PLUGIN_NAME) # Extract instance config (everything except metadata keys) instance_config = { @@ -1739,6 +1909,7 @@ def deeploy_prepare_plugins(self, inputs): if k not in { DEEPLOY_KEYS.PLUGIN_SIGNATURE, DEEPLOY_KEYS.PLUGIN_INSTANCE_ID, + DEEPLOY_KEYS.PLUGIN_NAME, self.ct.CONFIG_INSTANCE.K_INSTANCE_ID, "signature", "instance_id", @@ -1762,6 +1933,13 @@ def deeploy_prepare_plugins(self, inputs): **instance_config } + # Build name-to-instance mapping if plugin_name was provided + if plugin_name: + name_to_instance[plugin_name] = { + "instance_id": instance_id, + "signature": signature, + } + # Group by signature if signature not in plugins_by_signature: plugins_by_signature[signature] = [] @@ -1776,12 +1954,12 @@ def deeploy_prepare_plugins(self, inputs): } prepared_plugins.append(prepared_plugin) - return prepared_plugins + return prepared_plugins, name_to_instance # Legacy single-plugin format - use existing method plugin = self.deeploy_prepare_single_plugin_instance(inputs) plugins = [plugin] - return plugins + return plugins, {} def check_and_deploy_pipelines( self, owner, inputs, app_id, From 708bb2ca3d0ac677855b168ce7c5e23ffbd58aab Mon Sep 17 00:00:00 2001 From: toderian Date: Wed, 4 Mar 2026 11:00:58 +0200 Subject: [PATCH 04/10] Revert "fix: wrap liveness api services statuses check in try except" This reverts commit 9a913a72f1eaa3c3cf6703bd882c0cf022d9fc9a. Revert "fix: semaphored keys setup on shmem dynamic env dependencies" This reverts commit d111a6c318904e162bba0b6e1e59ef43071d37a4. --- extensions/business/deeploy/deeploy_const.py | 4 - .../business/deeploy/deeploy_manager_api.py | 3 +- extensions/business/deeploy/deeploy_mixin.py | 304 ++++-------------- extensions/business/liveness/liveness_api.py | 14 +- ver.py | 2 +- 5 files changed, 70 insertions(+), 257 deletions(-) diff --git a/extensions/business/deeploy/deeploy_const.py b/extensions/business/deeploy/deeploy_const.py index 44f8ca84..69ae69fd 100644 --- a/extensions/business/deeploy/deeploy_const.py +++ b/extensions/business/deeploy/deeploy_const.py @@ -52,8 +52,6 @@ class DEEPLOY_KEYS: PLUGINS = "plugins" PLUGIN_INSTANCES = "instances" PLUGIN_INSTANCE_ID = "instance_id" - PLUGIN_NAME = "plugin_name" - DEPENDENCY_TREE = "dependency_tree" # Auth result keys SENDER = "sender" SENDER_ESCROW = "sender_escrow" @@ -106,8 +104,6 @@ class DEEPLOY_ERRORS: PLUGINS1 = "ERR25_DEEPLOY_PLUGINS1" # Invalid plugins array structure PLUGINS2 = "ERR26_DEEPLOY_PLUGINS2" # Plugin missing required field PLUGINS3 = "ERR27_DEEPLOY_PLUGINS3" # Instance validation failed - DEPENDENCY_TREE1 = "ERR28_DEEPLOY_DEPENDENCY_TREE1" # Invalid dependency_tree entry - DEPENDENCY_TREE2 = "ERR29_DEEPLOY_DEPENDENCY_TREE2" # Circular dependency detected class DEEPLOY_RESOURCES: # Result dictionary keys diff --git a/extensions/business/deeploy/deeploy_manager_api.py b/extensions/business/deeploy/deeploy_manager_api.py index 063fee34..9ca53bfd 100644 --- a/extensions/business/deeploy/deeploy_manager_api.py +++ b/extensions/business/deeploy/deeploy_manager_api.py @@ -255,8 +255,7 @@ def _process_pipeline_request( if job_app_type not in JOB_APP_TYPES_ALL: raise ValueError(f"Invalid job_app_type '{job_app_type}'. Expected one of {JOB_APP_TYPES_ALL}.") else: - plugins_for_detection, _ = self.deeploy_prepare_plugins(inputs) - job_app_type = self.deeploy_detect_job_app_type(plugins_for_detection) + job_app_type = self.deeploy_detect_job_app_type(self.deeploy_prepare_plugins(inputs)) if job_app_type not in JOB_APP_TYPES_ALL: job_app_type = JOB_APP_TYPES.NATIVE self.P(f"Detected job app type: {job_app_type}") diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 75b19a95..9eac1679 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -100,10 +100,7 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, """ Create new pipelines on each node and set CSTORE `response_key` for the "callback" action """ - plugins, name_to_instance = self.deeploy_prepare_plugins(inputs) - if name_to_instance: - plugins = self._resolve_shmem_references(plugins, name_to_instance, app_id) - self._validate_dependency_tree(inputs) + plugins = self.deeploy_prepare_plugins(inputs) plugins = self._ensure_runner_cstore_auth_env( app_id=app_id, prepared_plugins=plugins, @@ -253,7 +250,7 @@ def __update_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, ts = self.time() detected_job_app_type = job_app_type if not detected_job_app_type: - plugins_for_detection, _ = self.deeploy_prepare_plugins(inputs) + plugins_for_detection = self.deeploy_prepare_plugins(inputs) detected_job_app_type = self.deeploy_detect_job_app_type(plugins_for_detection) if not dct_deeploy_specs: @@ -1334,7 +1331,7 @@ def deeploy_check_payment_and_job_owner(self, inputs, owner, is_create, debug=Fa if not job_app_type: try: self.Pd(" Detecting job app type from plugins...") - job_app_type = self.deeploy_detect_job_app_type(self.deeploy_prepare_plugins(inputs)[0]) + job_app_type = self.deeploy_detect_job_app_type(self.deeploy_prepare_plugins(inputs)) self.Pd(f" Detected job app type: {job_app_type}") except Exception as exc: self.Pd(f" Failed to detect job app type: {exc}") @@ -1457,209 +1454,35 @@ def extract_instance_confs(instances): signature, instances = normalized_plugins[0] normalized_signature = signature.upper() if isinstance(signature, str) else '' - if normalized_signature in (CONTAINER_APP_RUNNER_SIGNATURE, WORKER_APP_RUNNER_SIGNATURE): - # Multiple instances under one containerized signature → native multi-container - if len(instances) > 1: - return JOB_APP_TYPES.NATIVE - - if normalized_signature == CONTAINER_APP_RUNNER_SIGNATURE: - service_keywords = ('postgresql', 'postgres', 'mongo', 'mongodb', 'mysql', 'mssql') - for instance_conf in instances: - if not isinstance(instance_conf, dict): - continue - image_value = ( - instance_conf.get(DEEPLOY_KEYS.APP_PARAMS_IMAGE) - or instance_conf.get('IMAGE') - or instance_conf.get('image') - ) - if image_value and any(keyword in str(image_value).lower() for keyword in service_keywords): - return JOB_APP_TYPES.SERVICE - - return JOB_APP_TYPES.GENERIC - - return JOB_APP_TYPES.NATIVE - - def _has_shmem_dynamic_env(self, plugins): - """Check if any plugin instance has shmem-type DYNAMIC_ENV entries.""" - for plugin in plugins: - instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] - if not isinstance(instances, list): - continue - for instance in instances: - if not isinstance(instance, dict): - continue - dynamic_env = instance.get("DYNAMIC_ENV") - if not isinstance(dynamic_env, dict): + if normalized_signature == CONTAINER_APP_RUNNER_SIGNATURE: + service_keywords = ('postgresql', 'postgres', 'mongo', 'mongodb', 'mysql', 'mssql') + for instance_conf in instances: + if not isinstance(instance_conf, dict): continue - for _key, entries in dynamic_env.items(): - if not isinstance(entries, list): - continue - for entry in entries: - if isinstance(entry, dict) and entry.get("type") == "shmem": - return True - return False - - def _resolve_shmem_references(self, plugins, name_to_instance, app_id): - """ - Resolve shmem-type DYNAMIC_ENV entries by replacing plugin names with semaphore keys. - Sets SEMAPHORE on provider instances and SEMAPHORED_KEYS on consumer instances. - - Parameters - ---------- - plugins : list - Prepared plugins payload. - name_to_instance : dict - Mapping of plugin_name -> { instance_id, signature }. - app_id : str - Application identifier used to build semaphore keys. - - Returns - ------- - list - Modified plugins with shmem references resolved. - """ - try: - # Build name -> semaphore_key map - name_to_semaphore = {} - for pname, info in name_to_instance.items(): - iid = info.get("instance_id", "") - name_to_semaphore[pname] = self.sanitize_name("{}__{}".format(app_id, iid)) - - # Build instance_id -> instance reference for quick lookup - instance_id_to_ref = {} - for plugin in plugins: - instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] - for instance in instances: - if isinstance(instance, dict): - iid = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) - if iid: - instance_id_to_ref[iid] = instance - - providers = set() # instance_ids that are referenced as providers - consumers = {} # instance_id -> set of semaphore keys they depend on - - # Scan all instances for shmem DYNAMIC_ENV entries - for plugin in plugins: - instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] - for instance in instances: - if not isinstance(instance, dict): - continue - consumer_iid = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) - dynamic_env = instance.get("DYNAMIC_ENV") - if not isinstance(dynamic_env, dict): - continue - - for _key, entries in dynamic_env.items(): - if not isinstance(entries, list): - continue - for entry in entries: - if not isinstance(entry, dict) or entry.get("type") != "shmem": - continue - path = entry.get("path") - if not isinstance(path, (list, tuple)) or len(path) < 2: - continue - provider_name = path[0] - if provider_name not in name_to_semaphore: - self.Pd(f"Warning: shmem references unknown plugin '{provider_name}'", color='y') - continue - - semaphore_key = name_to_semaphore[provider_name] - # Replace plugin name in path with the semaphore key - entry["path"] = [semaphore_key, path[1]] - - # Track provider and consumer - provider_info = name_to_instance.get(provider_name, {}) - provider_iid = provider_info.get("instance_id") - if provider_iid: - providers.add(provider_iid) - if consumer_iid: - if consumer_iid not in consumers: - consumers[consumer_iid] = set() - consumers[consumer_iid].add(semaphore_key) - - # Set SEMAPHORE on each provider instance - for provider_iid in providers: - ref = instance_id_to_ref.get(provider_iid) - if ref is not None: - sem_key = self.sanitize_name("{}__{}".format(app_id, provider_iid)) - ref["SEMAPHORE"] = sem_key - - # Set SEMAPHORED_KEYS on each consumer instance - for consumer_iid, sem_keys in consumers.items(): - ref = instance_id_to_ref.get(consumer_iid) - if ref is not None: - ref["SEMAPHORED_KEYS"] = list(sem_keys) - - return plugins - except Exception as exc: - self.Pd(f"Failed to resolve shmem references: {exc}", color='y') - return plugins - - def _validate_dependency_tree(self, inputs): - """ - Validate dependency_tree from inputs for circular dependencies. - Raises ValueError if a cycle is detected. - """ - dep_tree = inputs.get(DEEPLOY_KEYS.DEPENDENCY_TREE) - if not dep_tree or not isinstance(dep_tree, list): - return - - # Validate structure - for edge in dep_tree: - if not isinstance(edge, (list, tuple)) or len(edge) != 2: - raise ValueError( - f"{DEEPLOY_ERRORS.DEPENDENCY_TREE1}: Invalid dependency_tree entry: {edge}. Each entry must be a [from, to] pair." + image_value = ( + instance_conf.get(DEEPLOY_KEYS.APP_PARAMS_IMAGE) + or instance_conf.get('IMAGE') + or instance_conf.get('image') ) + if image_value and any(keyword in str(image_value).lower() for keyword in service_keywords): + return JOB_APP_TYPES.SERVICE + return JOB_APP_TYPES.GENERIC - # DFS cycle detection - graph = {} - for from_node, to_node in dep_tree: - if from_node not in graph: - graph[from_node] = [] - graph[from_node].append(to_node) - - WHITE, GRAY, BLACK = 0, 1, 2 - color = {node: WHITE for node in graph} - - def dfs(node): - color[node] = GRAY - for neighbor in graph.get(node, []): - if neighbor not in color: - color[neighbor] = WHITE - c = color[neighbor] - if c == GRAY: - return True - if c == WHITE and dfs(neighbor): - return True - color[node] = BLACK - return False + if normalized_signature == WORKER_APP_RUNNER_SIGNATURE: + return JOB_APP_TYPES.GENERIC - for node in list(graph.keys()): - if color.get(node, WHITE) == WHITE: - if dfs(node): - raise ValueError( - f"{DEEPLOY_ERRORS.DEPENDENCY_TREE2}: Circular dependency detected in dependency_tree." - ) - return + return JOB_APP_TYPES.NATIVE def _autowire_native_container_semaphore(self, app_id, plugins, job_app_type): """ - Auto-configure semaphore settings for native + container plugin groups. - Supports N plugins: all native plugins get SEMAPHORE, all container plugins - get SEMAPHORED_KEYS pointing to all native semaphore keys. - - Skips autowiring if: - - Not a native job app type - - Less than 2 plugins - - Semaphore config already exists (e.g., from explicit shmem wiring) - - Explicit shmem DYNAMIC_ENV entries detected + Auto-configure semaphore settings for native + container pairs. Parameters ---------- app_id : str Application identifier used to build deterministic semaphore keys. plugins : list - Prepared plugins payload. + Prepared plugins payload (expected to be a two-item native/container pair). job_app_type : str Detected job application type. @@ -1671,76 +1494,85 @@ def _autowire_native_container_semaphore(self, app_id, plugins, job_app_type): try: if job_app_type != JOB_APP_TYPES.NATIVE: return plugins + # endif native job app type - if not isinstance(plugins, list) or len(plugins) < 2: + if not isinstance(plugins, list) or len(plugins) != 2: return plugins + # endif plugin pair check def has_semaphore_config(plugin_list): for plugin in plugin_list: instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] if not isinstance(instances, list): continue + # endif instances is list for instance in instances: if not isinstance(instance, dict): continue + # endif instance is dict if "SEMAPHORE" in instance or "SEMAPHORED_KEYS" in instance: return True + # endif instance has semaphore config + # endfor each instance + # endfor each plugin return False if has_semaphore_config(plugins): self.Pd("Skipping semaphore autowire; semaphore config already provided.") return plugins + # endif skip when provided - # Skip autowiring if explicit shmem references are present - if self._has_shmem_dynamic_env(plugins): - self.Pd("Skipping semaphore autowire; explicit shmem references detected.") - return plugins - - # Collect all native and container plugins - native_plugins = [] - container_plugins = [] + container_plugin = None + native_plugin = None for plugin in plugins: signature = plugin.get(self.ct.CONFIG_PLUGIN.K_SIGNATURE) if not signature: continue + # endif signature check normalized_signature = str(signature).upper() if normalized_signature in CONTAINERIZED_APPS_SIGNATURES: - container_plugins.append(plugin) + container_plugin = container_plugin or plugin else: - native_plugins.append(plugin) + native_plugin = native_plugin or plugin + # endif signature type + # endfor each plugin + + if not container_plugin or not native_plugin: + return plugins + # endif both plugin types found - if not container_plugins or not native_plugins: + native_instances = native_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + container_instances = container_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + + if not isinstance(native_instances, list) or not isinstance(container_instances, list): return plugins + # endif instance lists - # Set SEMAPHORE on all native instances semaphore_keys = [] - for native_plugin in native_plugins: - native_instances = native_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] - if not isinstance(native_instances, list): + for instance in native_instances: + if not isinstance(instance, dict): continue - for instance in native_instances: - if not isinstance(instance, dict): - continue - instance_id = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) - if not instance_id: - continue - semaphore_key = self.sanitize_name("{}__{}".format(app_id, instance_id)) - instance["SEMAPHORE"] = semaphore_key - semaphore_keys.append(semaphore_key) + # endif instance is dict + instance_id = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) + if not instance_id: + continue + # endif instance id + semaphore_key = self.sanitize_name("{}__{}".format(app_id, instance_id)) + instance["SEMAPHORE"] = semaphore_key + semaphore_keys.append(semaphore_key) + # endfor each native instance if not semaphore_keys: return plugins + # endif semaphore keys found - # Set SEMAPHORED_KEYS on all container instances - for container_plugin in container_plugins: - container_instances = container_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] - if not isinstance(container_instances, list): + for instance in container_instances: + if not isinstance(instance, dict): continue - for instance in container_instances: - if not isinstance(instance, dict): - continue - instance["SEMAPHORED_KEYS"] = list(semaphore_keys) + # endif container instance dict + instance["SEMAPHORED_KEYS"] = list(semaphore_keys) + # endfor each container instance return plugins except Exception as exc: @@ -1895,13 +1727,11 @@ def deeploy_prepare_plugins(self, inputs): if plugins_array and isinstance(plugins_array, list): # Group plugin instances by signature plugins_by_signature = {} - name_to_instance = {} used_instance_ids = set() for plugin_instance in plugins_array: signature = plugin_instance.get(DEEPLOY_KEYS.PLUGIN_SIGNATURE) - plugin_name = plugin_instance.get(DEEPLOY_KEYS.PLUGIN_NAME) # Extract instance config (everything except metadata keys) instance_config = { @@ -1909,7 +1739,6 @@ def deeploy_prepare_plugins(self, inputs): if k not in { DEEPLOY_KEYS.PLUGIN_SIGNATURE, DEEPLOY_KEYS.PLUGIN_INSTANCE_ID, - DEEPLOY_KEYS.PLUGIN_NAME, self.ct.CONFIG_INSTANCE.K_INSTANCE_ID, "signature", "instance_id", @@ -1933,13 +1762,6 @@ def deeploy_prepare_plugins(self, inputs): **instance_config } - # Build name-to-instance mapping if plugin_name was provided - if plugin_name: - name_to_instance[plugin_name] = { - "instance_id": instance_id, - "signature": signature, - } - # Group by signature if signature not in plugins_by_signature: plugins_by_signature[signature] = [] @@ -1954,12 +1776,12 @@ def deeploy_prepare_plugins(self, inputs): } prepared_plugins.append(prepared_plugin) - return prepared_plugins, name_to_instance + return prepared_plugins # Legacy single-plugin format - use existing method plugin = self.deeploy_prepare_single_plugin_instance(inputs) plugins = [plugin] - return plugins, {} + return plugins def check_and_deploy_pipelines( self, owner, inputs, app_id, diff --git a/extensions/business/liveness/liveness_api.py b/extensions/business/liveness/liveness_api.py index 87f13172..742cf30b 100644 --- a/extensions/business/liveness/liveness_api.py +++ b/extensions/business/liveness/liveness_api.py @@ -200,15 +200,11 @@ def __get_service_status(self, service_config): def __get_all_services_statuses(self): - try: - statuses = {} - for service in self.cfg_monitored_services.keys(): - cfg = self.__get_service_config(service) - statuses[service] = self.__get_service_status(cfg) - return statuses - except Exception as e: - self.P("Error while getting services statuses: {}".format(str(e))) - return {} + statuses = {} + for service in self.cfg_monitored_services.keys(): + cfg = self.__get_service_config(service) + statuses[service] = self.__get_service_status(cfg) + return statuses @BasePlugin.endpoint # /get_liveness_data diff --git a/ver.py b/ver.py index 355b0121..87547e9f 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.82' +__VER__ = '2.10.81' From 2cdaa07064f1fd0e0637fd89b1a1bf4098d196bc Mon Sep 17 00:00:00 2001 From: Vitalii <87299468+toderian@users.noreply.github.com> Date: Wed, 4 Mar 2026 18:02:30 +0200 Subject: [PATCH 05/10] Deeploy dynamic env semaphored keys (#369) * fix: wrap liveness api services statuses check in try except * fix: semaphored keys setup on shmem dynamic env dependencies * feat: add plugin semaphore map to deeploy specs * chore: increment version * fix: docstring for _autowire_native_container_semaphore * fix: docstring for _autowire_native_container_semaphore --- extensions/business/deeploy/deeploy_const.py | 4 + .../business/deeploy/deeploy_manager_api.py | 3 +- extensions/business/deeploy/deeploy_mixin.py | 347 ++++++++++++++---- extensions/business/liveness/liveness_api.py | 14 +- ver.py | 2 +- 5 files changed, 300 insertions(+), 70 deletions(-) diff --git a/extensions/business/deeploy/deeploy_const.py b/extensions/business/deeploy/deeploy_const.py index 69ae69fd..44f8ca84 100644 --- a/extensions/business/deeploy/deeploy_const.py +++ b/extensions/business/deeploy/deeploy_const.py @@ -52,6 +52,8 @@ class DEEPLOY_KEYS: PLUGINS = "plugins" PLUGIN_INSTANCES = "instances" PLUGIN_INSTANCE_ID = "instance_id" + PLUGIN_NAME = "plugin_name" + DEPENDENCY_TREE = "dependency_tree" # Auth result keys SENDER = "sender" SENDER_ESCROW = "sender_escrow" @@ -104,6 +106,8 @@ class DEEPLOY_ERRORS: PLUGINS1 = "ERR25_DEEPLOY_PLUGINS1" # Invalid plugins array structure PLUGINS2 = "ERR26_DEEPLOY_PLUGINS2" # Plugin missing required field PLUGINS3 = "ERR27_DEEPLOY_PLUGINS3" # Instance validation failed + DEPENDENCY_TREE1 = "ERR28_DEEPLOY_DEPENDENCY_TREE1" # Invalid dependency_tree entry + DEPENDENCY_TREE2 = "ERR29_DEEPLOY_DEPENDENCY_TREE2" # Circular dependency detected class DEEPLOY_RESOURCES: # Result dictionary keys diff --git a/extensions/business/deeploy/deeploy_manager_api.py b/extensions/business/deeploy/deeploy_manager_api.py index 9ca53bfd..063fee34 100644 --- a/extensions/business/deeploy/deeploy_manager_api.py +++ b/extensions/business/deeploy/deeploy_manager_api.py @@ -255,7 +255,8 @@ def _process_pipeline_request( if job_app_type not in JOB_APP_TYPES_ALL: raise ValueError(f"Invalid job_app_type '{job_app_type}'. Expected one of {JOB_APP_TYPES_ALL}.") else: - job_app_type = self.deeploy_detect_job_app_type(self.deeploy_prepare_plugins(inputs)) + plugins_for_detection, _ = self.deeploy_prepare_plugins(inputs) + job_app_type = self.deeploy_detect_job_app_type(plugins_for_detection) if job_app_type not in JOB_APP_TYPES_ALL: job_app_type = JOB_APP_TYPES.NATIVE self.P(f"Detected job app type: {job_app_type}") diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 9eac1679..1462fcd3 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -100,7 +100,18 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, """ Create new pipelines on each node and set CSTORE `response_key` for the "callback" action """ - plugins = self.deeploy_prepare_plugins(inputs) + plugins, name_to_instance = self.deeploy_prepare_plugins(inputs) + plugin_semaphore_map = {} + if name_to_instance: + plugins = self._resolve_shmem_references(plugins, name_to_instance, app_id) + # Build plugin_name → semaphore_key mapping only for actual providers + for plugin in plugins: + for instance in plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES, []): + sem_key = instance.get("SEMAPHORE") + pname = instance.get(DEEPLOY_KEYS.PLUGIN_NAME) + if sem_key and pname: + plugin_semaphore_map[pname] = sem_key + self._validate_dependency_tree(inputs) plugins = self._ensure_runner_cstore_auth_env( app_id=app_id, prepared_plugins=plugins, @@ -154,6 +165,9 @@ def __create_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, if detected_job_app_type in JOB_APP_TYPES_ALL: dct_deeploy_specs[DEEPLOY_KEYS.JOB_APP_TYPE] = detected_job_app_type + if plugin_semaphore_map: + dct_deeploy_specs[DEEPLOY_KEYS.JOB_CONFIG]["plugin_semaphore_map"] = plugin_semaphore_map + plugins = self._autowire_native_container_semaphore( app_id=app_id, plugins=plugins, @@ -250,7 +264,7 @@ def __update_pipeline_on_nodes(self, nodes, inputs, app_id, app_alias, app_type, ts = self.time() detected_job_app_type = job_app_type if not detected_job_app_type: - plugins_for_detection = self.deeploy_prepare_plugins(inputs) + plugins_for_detection, _ = self.deeploy_prepare_plugins(inputs) detected_job_app_type = self.deeploy_detect_job_app_type(plugins_for_detection) if not dct_deeploy_specs: @@ -1331,7 +1345,7 @@ def deeploy_check_payment_and_job_owner(self, inputs, owner, is_create, debug=Fa if not job_app_type: try: self.Pd(" Detecting job app type from plugins...") - job_app_type = self.deeploy_detect_job_app_type(self.deeploy_prepare_plugins(inputs)) + job_app_type = self.deeploy_detect_job_app_type(self.deeploy_prepare_plugins(inputs)[0]) self.Pd(f" Detected job app type: {job_app_type}") except Exception as exc: self.Pd(f" Failed to detect job app type: {exc}") @@ -1454,35 +1468,217 @@ def extract_instance_confs(instances): signature, instances = normalized_plugins[0] normalized_signature = signature.upper() if isinstance(signature, str) else '' - if normalized_signature == CONTAINER_APP_RUNNER_SIGNATURE: - service_keywords = ('postgresql', 'postgres', 'mongo', 'mongodb', 'mysql', 'mssql') - for instance_conf in instances: - if not isinstance(instance_conf, dict): - continue - image_value = ( - instance_conf.get(DEEPLOY_KEYS.APP_PARAMS_IMAGE) - or instance_conf.get('IMAGE') - or instance_conf.get('image') - ) - if image_value and any(keyword in str(image_value).lower() for keyword in service_keywords): - return JOB_APP_TYPES.SERVICE - return JOB_APP_TYPES.GENERIC + if normalized_signature in (CONTAINER_APP_RUNNER_SIGNATURE, WORKER_APP_RUNNER_SIGNATURE): + # Multiple instances under one containerized signature → native multi-container + if len(instances) > 1: + return JOB_APP_TYPES.NATIVE + + if normalized_signature == CONTAINER_APP_RUNNER_SIGNATURE: + service_keywords = ('postgresql', 'postgres', 'mongo', 'mongodb', 'mysql', 'mssql') + for instance_conf in instances: + if not isinstance(instance_conf, dict): + continue + image_value = ( + instance_conf.get(DEEPLOY_KEYS.APP_PARAMS_IMAGE) + or instance_conf.get('IMAGE') + or instance_conf.get('image') + ) + if image_value and any(keyword in str(image_value).lower() for keyword in service_keywords): + return JOB_APP_TYPES.SERVICE - if normalized_signature == WORKER_APP_RUNNER_SIGNATURE: return JOB_APP_TYPES.GENERIC return JOB_APP_TYPES.NATIVE + def _has_shmem_dynamic_env(self, plugins): + """Check if any plugin instance has shmem-type DYNAMIC_ENV entries.""" + for plugin in plugins: + instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + if not isinstance(instances, list): + continue + for instance in instances: + if not isinstance(instance, dict): + continue + dynamic_env = instance.get("DYNAMIC_ENV") + if not isinstance(dynamic_env, dict): + continue + for _key, entries in dynamic_env.items(): + if not isinstance(entries, list): + continue + for entry in entries: + if isinstance(entry, dict) and entry.get("type") == "shmem": + return True + return False + + def _resolve_shmem_references(self, plugins, name_to_instance, app_id): + """ + Resolve shmem-type DYNAMIC_ENV entries by replacing plugin names with semaphore keys. + Sets SEMAPHORE on provider instances and SEMAPHORED_KEYS on consumer instances. + + Parameters + ---------- + plugins : list + Prepared plugins payload. + name_to_instance : dict + Mapping of plugin_name -> { instance_id, signature }. + app_id : str + Application identifier used to build semaphore keys. + + Returns + ------- + list + Modified plugins with shmem references resolved. + """ + try: + # Build name -> semaphore_key map + name_to_semaphore = {} + for pname, info in name_to_instance.items(): + iid = info.get("instance_id", "") + name_to_semaphore[pname] = self.sanitize_name("{}__{}".format(app_id, iid)) + + # Build instance_id -> instance reference for quick lookup + instance_id_to_ref = {} + for plugin in plugins: + instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + for instance in instances: + if isinstance(instance, dict): + iid = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) + if iid: + instance_id_to_ref[iid] = instance + + providers = set() # instance_ids that are referenced as providers + consumers = {} # instance_id -> set of semaphore keys they depend on + + # Scan all instances for shmem DYNAMIC_ENV entries + for plugin in plugins: + instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + for instance in instances: + if not isinstance(instance, dict): + continue + consumer_iid = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) + dynamic_env = instance.get("DYNAMIC_ENV") + if not isinstance(dynamic_env, dict): + continue + + for _key, entries in dynamic_env.items(): + if not isinstance(entries, list): + continue + for entry in entries: + if not isinstance(entry, dict) or entry.get("type") != "shmem": + continue + path = entry.get("path") + if not isinstance(path, (list, tuple)) or len(path) < 2: + continue + provider_name = path[0] + if provider_name not in name_to_semaphore: + self.Pd(f"Warning: shmem references unknown plugin '{provider_name}'", color='y') + continue + + semaphore_key = name_to_semaphore[provider_name] + # Replace plugin name in path with the semaphore key + entry["path"] = [semaphore_key, path[1]] + + # Track provider and consumer + provider_info = name_to_instance.get(provider_name, {}) + provider_iid = provider_info.get("instance_id") + if provider_iid: + providers.add(provider_iid) + if consumer_iid: + if consumer_iid not in consumers: + consumers[consumer_iid] = set() + consumers[consumer_iid].add(semaphore_key) + + # Set SEMAPHORE on each provider instance + for provider_iid in providers: + ref = instance_id_to_ref.get(provider_iid) + if ref is not None: + sem_key = self.sanitize_name("{}__{}".format(app_id, provider_iid)) + ref["SEMAPHORE"] = sem_key + + # Set SEMAPHORED_KEYS on each consumer instance + for consumer_iid, sem_keys in consumers.items(): + ref = instance_id_to_ref.get(consumer_iid) + if ref is not None: + ref["SEMAPHORED_KEYS"] = list(sem_keys) + + return plugins + except Exception as exc: + self.Pd(f"Failed to resolve shmem references: {exc}", color='y') + return plugins + + def _validate_dependency_tree(self, inputs): + """ + Validate dependency_tree from inputs for circular dependencies. + Raises ValueError if a cycle is detected. + """ + dep_tree = inputs.get(DEEPLOY_KEYS.DEPENDENCY_TREE) + if not dep_tree or not isinstance(dep_tree, list): + return + + # Validate structure + for edge in dep_tree: + if not isinstance(edge, (list, tuple)) or len(edge) != 2: + raise ValueError( + f"{DEEPLOY_ERRORS.DEPENDENCY_TREE1}: Invalid dependency_tree entry: {edge}. Each entry must be a [from, to] pair." + ) + + # DFS cycle detection + graph = {} + for from_node, to_node in dep_tree: + if from_node not in graph: + graph[from_node] = [] + graph[from_node].append(to_node) + + WHITE, GRAY, BLACK = 0, 1, 2 + color = {node: WHITE for node in graph} + + def dfs(node): + color[node] = GRAY + for neighbor in graph.get(node, []): + if neighbor not in color: + color[neighbor] = WHITE + c = color[neighbor] + if c == GRAY: + return True + if c == WHITE and dfs(neighbor): + return True + color[node] = BLACK + return False + + for node in list(graph.keys()): + if color.get(node, WHITE) == WHITE: + if dfs(node): + raise ValueError( + f"{DEEPLOY_ERRORS.DEPENDENCY_TREE2}: Circular dependency detected in dependency_tree." + ) + return + def _autowire_native_container_semaphore(self, app_id, plugins, job_app_type): """ - Auto-configure semaphore settings for native + container pairs. + Auto-configure semaphore settings for a native job that has one native plugin + and one containerized plugin (CONTAINER_APP_RUNNER or WORKER_APP_RUNNER). + + The native plugin acts as the orchestrator — it receives a SEMAPHORE key so that + containerized plugins can wait for it to be ready. Each containerized plugin + instance receives SEMAPHORED_KEYS containing all native semaphore keys. + + This is only meant for the simple native + CAR/WAR pattern. When the user + provides explicit shmem DYNAMIC_ENV references or manual SEMAPHORE / + SEMAPHORED_KEYS config, this autowiring is skipped entirely and the user's + explicit dependency tree takes precedence. + + Skips autowiring when: + - job_app_type is not NATIVE + - Fewer than 2 signature groups (need both a native and a containerized group) + - SEMAPHORE / SEMAPHORED_KEYS already present on any instance + - Explicit shmem DYNAMIC_ENV entries detected (user-defined dependency tree) Parameters ---------- app_id : str Application identifier used to build deterministic semaphore keys. plugins : list - Prepared plugins payload (expected to be a two-item native/container pair). + Prepared plugins payload (one entry per signature group, each with INSTANCES). job_app_type : str Detected job application type. @@ -1494,85 +1690,100 @@ def _autowire_native_container_semaphore(self, app_id, plugins, job_app_type): try: if job_app_type != JOB_APP_TYPES.NATIVE: return plugins - # endif native job app type + # end if - if not isinstance(plugins, list) or len(plugins) != 2: + if not isinstance(plugins, list) or len(plugins) < 2: return plugins - # endif plugin pair check + # end if def has_semaphore_config(plugin_list): for plugin in plugin_list: instances = plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] if not isinstance(instances, list): continue - # endif instances is list + # end if for instance in instances: if not isinstance(instance, dict): continue - # endif instance is dict + # end if if "SEMAPHORE" in instance or "SEMAPHORED_KEYS" in instance: return True - # endif instance has semaphore config - # endfor each instance - # endfor each plugin + # end if + # end for instance + # end for plugin return False + # end has_semaphore_config if has_semaphore_config(plugins): self.Pd("Skipping semaphore autowire; semaphore config already provided.") return plugins - # endif skip when provided + # end if - container_plugin = None - native_plugin = None + # Skip autowiring if explicit shmem references are present + if self._has_shmem_dynamic_env(plugins): + self.Pd("Skipping semaphore autowire; explicit shmem references detected.") + return plugins + # end if + + # Collect all native and container plugins + native_plugins = [] + container_plugins = [] for plugin in plugins: signature = plugin.get(self.ct.CONFIG_PLUGIN.K_SIGNATURE) if not signature: continue - # endif signature check + # end if normalized_signature = str(signature).upper() if normalized_signature in CONTAINERIZED_APPS_SIGNATURES: - container_plugin = container_plugin or plugin + container_plugins.append(plugin) else: - native_plugin = native_plugin or plugin - # endif signature type - # endfor each plugin - - if not container_plugin or not native_plugin: - return plugins - # endif both plugin types found - - native_instances = native_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] - container_instances = container_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + native_plugins.append(plugin) + # end if + # end for plugin - if not isinstance(native_instances, list) or not isinstance(container_instances, list): + if not container_plugins or not native_plugins: return plugins - # endif instance lists + # end if + # Set SEMAPHORE on all native instances semaphore_keys = [] - for instance in native_instances: - if not isinstance(instance, dict): + for native_plugin in native_plugins: + native_instances = native_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + if not isinstance(native_instances, list): continue - # endif instance is dict - instance_id = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) - if not instance_id: - continue - # endif instance id - semaphore_key = self.sanitize_name("{}__{}".format(app_id, instance_id)) - instance["SEMAPHORE"] = semaphore_key - semaphore_keys.append(semaphore_key) - # endfor each native instance + # end if + for instance in native_instances: + if not isinstance(instance, dict): + continue + # end if + instance_id = instance.get(self.ct.CONFIG_INSTANCE.K_INSTANCE_ID) + if not instance_id: + continue + # end if + semaphore_key = self.sanitize_name("{}__{}".format(app_id, instance_id)) + instance["SEMAPHORE"] = semaphore_key + semaphore_keys.append(semaphore_key) + # end for instance + # end for native_plugin if not semaphore_keys: return plugins - # endif semaphore keys found + # end if - for instance in container_instances: - if not isinstance(instance, dict): + # Set SEMAPHORED_KEYS on all container instances + for container_plugin in container_plugins: + container_instances = container_plugin.get(self.ct.CONFIG_PLUGIN.K_INSTANCES) or [] + if not isinstance(container_instances, list): continue - # endif container instance dict - instance["SEMAPHORED_KEYS"] = list(semaphore_keys) - # endfor each container instance + # end if + for instance in container_instances: + if not isinstance(instance, dict): + continue + # end if + instance["SEMAPHORED_KEYS"] = list(semaphore_keys) + # end for instance + # end for container_plugin return plugins except Exception as exc: @@ -1727,13 +1938,16 @@ def deeploy_prepare_plugins(self, inputs): if plugins_array and isinstance(plugins_array, list): # Group plugin instances by signature plugins_by_signature = {} + name_to_instance = {} used_instance_ids = set() for plugin_instance in plugins_array: signature = plugin_instance.get(DEEPLOY_KEYS.PLUGIN_SIGNATURE) + plugin_name = plugin_instance.get(DEEPLOY_KEYS.PLUGIN_NAME) # Extract instance config (everything except metadata keys) + # Note: plugin_name is intentionally kept so it can be recovered during job editing instance_config = { k: v for k, v in plugin_instance.items() if k not in { @@ -1762,6 +1976,13 @@ def deeploy_prepare_plugins(self, inputs): **instance_config } + # Build name-to-instance mapping if plugin_name was provided + if plugin_name: + name_to_instance[plugin_name] = { + "instance_id": instance_id, + "signature": signature, + } + # Group by signature if signature not in plugins_by_signature: plugins_by_signature[signature] = [] @@ -1776,12 +1997,12 @@ def deeploy_prepare_plugins(self, inputs): } prepared_plugins.append(prepared_plugin) - return prepared_plugins + return prepared_plugins, name_to_instance # Legacy single-plugin format - use existing method plugin = self.deeploy_prepare_single_plugin_instance(inputs) plugins = [plugin] - return plugins + return plugins, {} def check_and_deploy_pipelines( self, owner, inputs, app_id, diff --git a/extensions/business/liveness/liveness_api.py b/extensions/business/liveness/liveness_api.py index 742cf30b..87f13172 100644 --- a/extensions/business/liveness/liveness_api.py +++ b/extensions/business/liveness/liveness_api.py @@ -200,11 +200,15 @@ def __get_service_status(self, service_config): def __get_all_services_statuses(self): - statuses = {} - for service in self.cfg_monitored_services.keys(): - cfg = self.__get_service_config(service) - statuses[service] = self.__get_service_status(cfg) - return statuses + try: + statuses = {} + for service in self.cfg_monitored_services.keys(): + cfg = self.__get_service_config(service) + statuses[service] = self.__get_service_status(cfg) + return statuses + except Exception as e: + self.P("Error while getting services statuses: {}".format(str(e))) + return {} @BasePlugin.endpoint # /get_liveness_data diff --git a/ver.py b/ver.py index 87547e9f..17dd3cc3 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.81' +__VER__ = '2.10.83' From 79884570aaac3880d1f6bbdec33257f77d529b14 Mon Sep 17 00:00:00 2001 From: Vitalii <87299468+toderian@users.noreply.github.com> Date: Thu, 5 Mar 2026 00:07:08 +0200 Subject: [PATCH 06/10] fix: war custom entrypoint (#370) --- .../container_apps/container_app_runner.py | 24 +++++++++++++++++-- .../container_apps/worker_app_runner.py | 4 +++- ver.py | 2 +- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/extensions/business/container_apps/container_app_runner.py b/extensions/business/container_apps/container_app_runner.py index f3a4af01..5459798b 100644 --- a/extensions/business/container_apps/container_app_runner.py +++ b/extensions/business/container_apps/container_app_runner.py @@ -260,7 +260,9 @@ def from_dict(cls, config_dict: dict) -> "HealthCheckConfig": # Container-specific config options "IMAGE": None, # Required container image, e.g. "my_repo/my_app:latest" + "CONTAINER_ENTRYPOINT": None, # Optional entrypoint override for the container "CONTAINER_START_COMMAND": None, # Optional command list executed when launching the container + "CONTAINER_USER": None, # Optional user override for the container (e.g. "root", "0:0") "BUILD_AND_RUN_COMMANDS": [], # Optional commands executed inside the running container "CR_DATA": { # dict of container registry data "SERVER": 'docker.io', # Optional container registry URL @@ -1209,6 +1211,11 @@ def _validate_runner_config(self): ValueError If configuration is invalid """ + self._entrypoint = self._normalize_container_command( + getattr(self, 'cfg_container_entrypoint', None), + field_name='CONTAINER_ENTRYPOINT', + ) + self._start_command = self._normalize_container_command( getattr(self, 'cfg_container_start_command', None), field_name='CONTAINER_START_COMMAND', @@ -2060,7 +2067,9 @@ def start_container(self): log_str += f" Resources: {self.json_dumps(self.cfg_container_resources) if self.cfg_container_resources else 'None'}\n" log_str += f" Restart policy: {self.cfg_restart_policy}\n" log_str += f" Pull policy: {self.cfg_image_pull_policy}\n" + log_str += f" Entrypoint: {self._entrypoint if self._entrypoint else 'Image default'}\n" log_str += f" Start command: {self._start_command if self._start_command else 'Image default'}\n" + log_str += f" User: {self.cfg_container_user if self.cfg_container_user else 'Image default'}\n" self.P(log_str) @@ -2098,9 +2107,15 @@ def start_container(self): # endif try: + if self._entrypoint: + run_kwargs['entrypoint'] = self._entrypoint + if self._start_command: run_kwargs['command'] = self._start_command + if self.cfg_container_user: + run_kwargs['user'] = self.cfg_container_user + self.container = self.docker_client.containers.run( self.cfg_image, **run_kwargs, @@ -2300,11 +2315,16 @@ def _run_container_exec(self, shell_cmd): return self.P(f"Running container exec command: {shell_cmd}") - exec_result = self.container.exec_run( - ["sh", "-c", shell_cmd], + exec_kwargs = dict( stream=True, detach=False, ) + if self.cfg_container_user: + exec_kwargs['user'] = self.cfg_container_user + exec_result = self.container.exec_run( + ["sh", "-c", shell_cmd], + **exec_kwargs, + ) thread = threading.Thread( target=self._stream_logs, args=(exec_result.output,), diff --git a/extensions/business/container_apps/worker_app_runner.py b/extensions/business/container_apps/worker_app_runner.py index 8d5b50bd..04366d2e 100644 --- a/extensions/business/container_apps/worker_app_runner.py +++ b/extensions/business/container_apps/worker_app_runner.py @@ -30,7 +30,9 @@ "CAR_VERBOSE": 10, "IMAGE": "node:22", - "CONTAINER_START_COMMAND": ["sh", "-c", "while true; do sleep 3600; done"], + "CONTAINER_ENTRYPOINT": "sh", + "CONTAINER_START_COMMAND": ["-c", "while true; do sleep 3600; done"], + "CONTAINER_USER": "root", "BUILD_AND_RUN_COMMANDS": ["npm install", "npm run build", "npm start"], "SETUP_REPO": True, # defines if we have to set up the repo (should add git clone commands or not) diff --git a/ver.py b/ver.py index 17dd3cc3..9cfb59fb 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.83' +__VER__ = '2.10.84' From 2210920af57b03292dd4b92be0803a7b7b7ad104 Mon Sep 17 00:00:00 2001 From: Vitalii <87299468+toderian@users.noreply.github.com> Date: Thu, 5 Mar 2026 01:56:59 +0200 Subject: [PATCH 07/10] Fix car image path (#371) * fix: car image path generation * chore: inc version --- .../container_apps/container_app_runner.py | 27 +++++++++++-------- .../container_apps/container_utils.py | 23 ++++++++++++++++ ver.py | 2 +- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/extensions/business/container_apps/container_app_runner.py b/extensions/business/container_apps/container_app_runner.py index 5459798b..866bb209 100644 --- a/extensions/business/container_apps/container_app_runner.py +++ b/extensions/business/container_apps/container_app_runner.py @@ -2117,7 +2117,7 @@ def start_container(self): run_kwargs['user'] = self.cfg_container_user self.container = self.docker_client.containers.run( - self.cfg_image, + self._get_full_image_ref(), **run_kwargs, ) @@ -2838,8 +2838,9 @@ def _get_local_image(self): if not self.cfg_image: return None + full_image = self._get_full_image_ref() try: - img = self.docker_client.images.get(self.cfg_image) + img = self.docker_client.images.get(full_image) return img except Exception: return None @@ -2863,15 +2864,16 @@ def _pull_image_from_registry(self): self.P("No Docker image configured", color='r') return None + full_image = self._get_full_image_ref() try: - self.P(f"Pulling image '{self.cfg_image}'...") - img = self.docker_client.images.pull(self.cfg_image) + self.P(f"Pulling image '{full_image}'...") + img = self.docker_client.images.pull(full_image) # docker-py may return Image or list[Image] if isinstance(img, list) and img: img = img[-1] - self.P(f"Successfully pulled image '{self.cfg_image}'") + self.P(f"Successfully pulled image '{full_image}'") return img except Exception as e: @@ -2899,13 +2901,15 @@ def _pull_image_with_fallback(self): RuntimeError If authentication fails and no local image exists """ + full_image = self._get_full_image_ref() + # Step 1: Authenticate with registry if not self._login_to_registry(): self.P("Registry authentication failed", color='r') # Try to use local image if authentication fails local_img = self._get_local_image() if local_img: - self.P(f"Using local image (registry login failed): {self.cfg_image}", color='r') + self.P(f"Using local image (registry login failed): {full_image}", color='r') return local_img raise RuntimeError("Failed to authenticate with registry and no local image available.") @@ -2915,14 +2919,14 @@ def _pull_image_with_fallback(self): return img # Step 3: Fallback to local image - self.P(f"Pull failed, checking for local image: {self.cfg_image}", color='r') + self.P(f"Pull failed, checking for local image: {full_image}", color='r') local_img = self._get_local_image() if local_img: - self.P(f"Using local image as fallback: {self.cfg_image}", color='r') + self.P(f"Using local image as fallback: {full_image}", color='r') return local_img # Step 4: No image available - self.P(f"No image available (pull failed and no local image): {self.cfg_image}", color='r') + self.P(f"No image available (pull failed and no local image): {full_image}", color='r') return None @@ -3172,14 +3176,15 @@ def _ensure_image_if_not_present(self): bool True if image is available (locally or after pull), False otherwise """ + full_image = self._get_full_image_ref() # Check if image exists locally local_img = self._get_local_image() if local_img: - self.P(f"Image '{self.cfg_image}' found locally") + self.P(f"Image '{full_image}' found locally") return True # Image not found locally, pull it - self.P(f"Image not found locally, pulling '{self.cfg_image}'...") + self.P(f"Image not found locally, pulling '{full_image}'...") img = self._pull_image_with_fallback() return img is not None diff --git a/extensions/business/container_apps/container_utils.py b/extensions/business/container_apps/container_utils.py index a54e8e9d..47d501cd 100644 --- a/extensions/business/container_apps/container_utils.py +++ b/extensions/business/container_apps/container_utils.py @@ -28,6 +28,29 @@ def _get_cr_data(self): cr_password = cr_data.get('PASSWORD') return cr_server, cr_username, cr_password + def _get_full_image_ref(self): + """ + Get the full Docker image reference including the registry server prefix. + + For non-Docker-Hub registries (e.g. ghcr.io), the server must be prepended + to the image name so Docker pulls from the correct registry. + + Returns: + str: Full image reference (e.g. 'ghcr.io/misp/misp-docker/misp-core:latest') + """ + image = self.cfg_image + if not image: + return image + cr_server, _, _ = self._get_cr_data() + if not cr_server or cr_server.lower() in ('docker.io', 'registry.hub.docker.com'): + return image + # end if + # Avoid double-prefixing if user already included the registry in image name + if image.lower().startswith(cr_server.lower() + '/'): + return image + # end if + return f"{cr_server}/{image}" + def _login_to_registry(self): """ Login to a private container registry using credentials from _get_cr_data. diff --git a/ver.py b/ver.py index 9cfb59fb..6390a9be 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.84' +__VER__ = '2.10.85' From 6c4b95cc01381d0f8037cc0b4f708074c7c78bc6 Mon Sep 17 00:00:00 2001 From: toderian Date: Thu, 5 Mar 2026 02:40:38 +0200 Subject: [PATCH 08/10] fix(HOT): setup dynamic env on container restart | consumer containers wait for provider on restart --- .../container_apps/container_app_runner.py | 20 +++++++++++++++++++ ver.py | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/extensions/business/container_apps/container_app_runner.py b/extensions/business/container_apps/container_app_runner.py index 866bb209..d6327a90 100644 --- a/extensions/business/container_apps/container_app_runner.py +++ b/extensions/business/container_apps/container_app_runner.py @@ -3121,9 +3121,29 @@ def _restart_container(self, stop_reason=None): # Set state after reset self._set_container_state(ContainerState.RESTARTING, stop_reason or StopReason.UNKNOWN) + # Re-login to registry (reset_vars creates a new Docker client, losing the session) + self._login_to_registry() + self._setup_resource_limits_and_ports() self._configure_volumes() self._configure_file_volumes() + + # For semaphored containers (consumers), defer env setup and container start + # to _handle_initial_launch() which properly waits for provider semaphores. + # This is critical when all containers in a pipeline restart simultaneously + # (e.g., via RESTART command) — providers need time to publish shmem values. + if self._semaphore_get_keys(): + # Reset semaphore wait state so _wait_for_semaphores logs fresh status + if hasattr(self, '_semaphore_wait_logged'): + del self._semaphore_wait_logged + + self._validate_extra_tunnels_config() + self._validate_runner_config() + self.P("Consumer container with semaphore dependencies: deferring start until providers are ready") + return + + # Non-semaphored containers (providers): configure env and start immediately + self._configure_dynamic_env() self._setup_env_and_ports() # Revalidate extra tunnels diff --git a/ver.py b/ver.py index 6390a9be..e81fa170 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.85' +__VER__ = '2.10.86' From a05569d1f88087a4d8b4ca153e1bcdf041f9aee2 Mon Sep 17 00:00:00 2001 From: Cristi Bleotiu Date: Thu, 5 Mar 2026 11:17:59 +0200 Subject: [PATCH 09/10] chore: inc ver for core package sync --- ver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ver.py b/ver.py index e81fa170..6d50bcb8 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.86' +__VER__ = '2.10.87' From 938692e52a4cc31f5865853d1b4605328534e96e Mon Sep 17 00:00:00 2001 From: Cristi Bleotiu Date: Thu, 5 Mar 2026 12:08:23 +0200 Subject: [PATCH 10/10] chore: inc ver for merge in main --- ver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ver.py b/ver.py index 6d50bcb8..47687a8c 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.87' +__VER__ = '2.10.90'