Skip to content
80 changes: 64 additions & 16 deletions extensions/business/container_apps/container_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ def from_dict(cls, config_dict: dict) -> "HealthCheckConfig":

# Container-specific config options
"IMAGE": None, # Required container image, e.g. "my_repo/my_app:latest"
"CONTAINER_ENTRYPOINT": None, # Optional entrypoint override for the container
"CONTAINER_START_COMMAND": None, # Optional command list executed when launching the container
"CONTAINER_USER": None, # Optional user override for the container (e.g. "root", "0:0")
"BUILD_AND_RUN_COMMANDS": [], # Optional commands executed inside the running container
"CR_DATA": { # dict of container registry data
"SERVER": 'docker.io', # Optional container registry URL
Expand Down Expand Up @@ -1209,6 +1211,11 @@ def _validate_runner_config(self):
ValueError
If configuration is invalid
"""
self._entrypoint = self._normalize_container_command(
getattr(self, 'cfg_container_entrypoint', None),
field_name='CONTAINER_ENTRYPOINT',
)

self._start_command = self._normalize_container_command(
getattr(self, 'cfg_container_start_command', None),
field_name='CONTAINER_START_COMMAND',
Expand Down Expand Up @@ -1282,7 +1289,6 @@ def on_init(self):

self.reset_tunnel_engine()

self._configure_dynamic_env() # setup dynamic env vars for the container
self._setup_resource_limits_and_ports() # setup container resource limits (CPU, GPU, memory, ports)
self._configure_volumes() # setup container volumes
self._configure_file_volumes() # setup file volumes with dynamic content
Expand Down Expand Up @@ -2061,7 +2067,9 @@ def start_container(self):
log_str += f" Resources: {self.json_dumps(self.cfg_container_resources) if self.cfg_container_resources else 'None'}\n"
log_str += f" Restart policy: {self.cfg_restart_policy}\n"
log_str += f" Pull policy: {self.cfg_image_pull_policy}\n"
log_str += f" Entrypoint: {self._entrypoint if self._entrypoint else 'Image default'}\n"
log_str += f" Start command: {self._start_command if self._start_command else 'Image default'}\n"
log_str += f" User: {self.cfg_container_user if self.cfg_container_user else 'Image default'}\n"

self.P(log_str)

Expand Down Expand Up @@ -2099,11 +2107,17 @@ def start_container(self):
# endif

try:
if self._entrypoint:
run_kwargs['entrypoint'] = self._entrypoint

if self._start_command:
run_kwargs['command'] = self._start_command

if self.cfg_container_user:
run_kwargs['user'] = self.cfg_container_user

self.container = self.docker_client.containers.run(
self.cfg_image,
self._get_full_image_ref(),
**run_kwargs,
)

Expand Down Expand Up @@ -2301,11 +2315,16 @@ def _run_container_exec(self, shell_cmd):
return

self.P(f"Running container exec command: {shell_cmd}")
exec_result = self.container.exec_run(
["sh", "-c", shell_cmd],
exec_kwargs = dict(
stream=True,
detach=False,
)
if self.cfg_container_user:
exec_kwargs['user'] = self.cfg_container_user
exec_result = self.container.exec_run(
["sh", "-c", shell_cmd],
**exec_kwargs,
)
thread = threading.Thread(
target=self._stream_logs,
args=(exec_result.output,),
Expand Down Expand Up @@ -2601,6 +2620,10 @@ def _setup_semaphore_env(self):
if port:
self.semaphore_set_env('PORT', str(port))
self.semaphore_set_env('URL', 'http://{}:{}'.format(localhost_ip, port))
container_ip = self._get_container_ip()
self.Pd(f"Container IP address: {container_ip}")
if container_ip:
self.semaphore_set_env('CONTAINER_IP', container_ip)
return


Expand Down Expand Up @@ -2815,8 +2838,9 @@ def _get_local_image(self):
if not self.cfg_image:
return None

full_image = self._get_full_image_ref()
try:
img = self.docker_client.images.get(self.cfg_image)
img = self.docker_client.images.get(full_image)
return img
except Exception:
return None
Expand All @@ -2840,15 +2864,16 @@ def _pull_image_from_registry(self):
self.P("No Docker image configured", color='r')
return None

full_image = self._get_full_image_ref()
try:
self.P(f"Pulling image '{self.cfg_image}'...")
img = self.docker_client.images.pull(self.cfg_image)
self.P(f"Pulling image '{full_image}'...")
img = self.docker_client.images.pull(full_image)

# docker-py may return Image or list[Image]
if isinstance(img, list) and img:
img = img[-1]

self.P(f"Successfully pulled image '{self.cfg_image}'")
self.P(f"Successfully pulled image '{full_image}'")
return img

except Exception as e:
Expand Down Expand Up @@ -2876,13 +2901,15 @@ def _pull_image_with_fallback(self):
RuntimeError
If authentication fails and no local image exists
"""
full_image = self._get_full_image_ref()

# Step 1: Authenticate with registry
if not self._login_to_registry():
self.P("Registry authentication failed", color='r')
# Try to use local image if authentication fails
local_img = self._get_local_image()
if local_img:
self.P(f"Using local image (registry login failed): {self.cfg_image}", color='r')
self.P(f"Using local image (registry login failed): {full_image}", color='r')
return local_img
raise RuntimeError("Failed to authenticate with registry and no local image available.")

Expand All @@ -2892,14 +2919,14 @@ def _pull_image_with_fallback(self):
return img

# Step 3: Fallback to local image
self.P(f"Pull failed, checking for local image: {self.cfg_image}", color='r')
self.P(f"Pull failed, checking for local image: {full_image}", color='r')
local_img = self._get_local_image()
if local_img:
self.P(f"Using local image as fallback: {self.cfg_image}", color='r')
self.P(f"Using local image as fallback: {full_image}", color='r')
return local_img

# Step 4: No image available
self.P(f"No image available (pull failed and no local image): {self.cfg_image}", color='r')
self.P(f"No image available (pull failed and no local image): {full_image}", color='r')
return None


Expand Down Expand Up @@ -3094,10 +3121,29 @@ def _restart_container(self, stop_reason=None):
# Set state after reset
self._set_container_state(ContainerState.RESTARTING, stop_reason or StopReason.UNKNOWN)

self._configure_dynamic_env()
# Re-login to registry (reset_vars creates a new Docker client, losing the session)
self._login_to_registry()

self._setup_resource_limits_and_ports()
self._configure_volumes()
self._configure_file_volumes()

# For semaphored containers (consumers), defer env setup and container start
# to _handle_initial_launch() which properly waits for provider semaphores.
# This is critical when all containers in a pipeline restart simultaneously
# (e.g., via RESTART command) — providers need time to publish shmem values.
if self._semaphore_get_keys():
# Reset semaphore wait state so _wait_for_semaphores logs fresh status
if hasattr(self, '_semaphore_wait_logged'):
del self._semaphore_wait_logged

self._validate_extra_tunnels_config()
self._validate_runner_config()
self.P("Consumer container with semaphore dependencies: deferring start until providers are ready")
return

# Non-semaphored containers (providers): configure env and start immediately
self._configure_dynamic_env()
self._setup_env_and_ports()

# Revalidate extra tunnels
Expand Down Expand Up @@ -3150,14 +3196,15 @@ def _ensure_image_if_not_present(self):
bool
True if image is available (locally or after pull), False otherwise
"""
full_image = self._get_full_image_ref()
# Check if image exists locally
local_img = self._get_local_image()
if local_img:
self.P(f"Image '{self.cfg_image}' found locally")
self.P(f"Image '{full_image}' found locally")
return True

# Image not found locally, pull it
self.P(f"Image not found locally, pulling '{self.cfg_image}'...")
self.P(f"Image not found locally, pulling '{full_image}'...")
img = self._pull_image_with_fallback()
return img is not None

Expand Down Expand Up @@ -3278,7 +3325,8 @@ def _handle_initial_launch(self):
if not self._wait_for_semaphores():
return # Still
# end if
# Semaphores ready - now setup env vars with semaphore values
# Semaphores ready - dynamic env to resolve shmem values, then setup env
self._configure_dynamic_env()
self._setup_env_and_ports()
# end if

Expand Down
93 changes: 87 additions & 6 deletions extensions/business/container_apps/container_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,29 @@ def _get_cr_data(self):
cr_password = cr_data.get('PASSWORD')
return cr_server, cr_username, cr_password

def _get_full_image_ref(self):
"""
Get the full Docker image reference including the registry server prefix.

For non-Docker-Hub registries (e.g. ghcr.io), the server must be prepended
to the image name so Docker pulls from the correct registry.

Returns:
str: Full image reference (e.g. 'ghcr.io/misp/misp-docker/misp-core:latest')
"""
image = self.cfg_image
if not image:
return image
cr_server, _, _ = self._get_cr_data()
if not cr_server or cr_server.lower() in ('docker.io', 'registry.hub.docker.com'):
return image
# end if
# Avoid double-prefixing if user already included the registry in image name
if image.lower().startswith(cr_server.lower() + '/'):
return image
# end if
return f"{cr_server}/{image}"

def _login_to_registry(self):
"""
Login to a private container registry using credentials from _get_cr_data.
Expand Down Expand Up @@ -89,6 +112,8 @@ def _get_default_env_vars(self):
"R1EN_R1FS_API_URL": f"http://{localhost_ip}:31235",
"EE_CHAINSTORE_PEERS": str_chainstore_peers,
"R1EN_CHAINSTORE_PEERS": str_chainstore_peers,

"R1EN_CONTAINER_IP": self._get_container_ip()

# OBSERVATION: From now on only add new env vars with R1EN_ prefix
# to avoid missunderstandings with EE_ prefixed vars that
Expand Down Expand Up @@ -138,7 +163,30 @@ def _get_chainstore_response_data(self):

return data

def _setup_dynamic_env_var_host_ip(self):
def _get_container_ip(self):
"""
Get the container's IP address from Docker network settings.

If a semaphore is configured, also sets the CONTAINER_IP env var
in shared memory for paired plugins.

Returns
-------
str or None
Container IP address, or None if unavailable
"""
if not self.container:
return None
try:
self.container.reload()
container_ip = self.container.attrs['NetworkSettings']['IPAddress']
return container_ip
except Exception as e:
self.P(f"Could not get container IP: {e}", color='r')
return None


def _setup_dynamic_env_var_host_ip(self, **kwargs):
"""
Get host IP address for dynamic environment variable.

Expand All @@ -149,7 +197,36 @@ def _setup_dynamic_env_var_host_ip(self):
"""
return self.log.get_localhost_ip()

def _setup_dynamic_env_var_some_other_calc_type(self):

def _setup_dynamic_env_var_shmem(self, path, **kwargs):
"""
Get a value from shared memory semaphore environment.

Parameters
----------
path : list or str
If list: [semaphore_key, env_key] - joined with '_' to form full key
If str: the full prefixed key to look up directly

Returns
-------
str
The semaphore env value, or empty string if not found
"""
if isinstance(path, list):
if len(path) != 2:
self.P(f"Invalid shmem path: {path}. Expected [semaphore_key, env_key]", color='r')
return ""
value = self.semaphore_get_env_value(path[0], path[1])
elif isinstance(path, str):
value = self.semaphore_get_env_value_by_path(path)
else:
self.P(f"Invalid shmem path type: {type(path)}. Expected list or str", color='r')
return ""
return str(value) if value else ""


def _setup_dynamic_env_var_some_other_calc_type(self, **kwargs):
"""
Example dynamic environment variable calculator.

Expand Down Expand Up @@ -206,11 +283,15 @@ def _configure_dynamic_env(self):
if callable(func):
# Call the function and append its result to the variable value
try:
candidate_value = func()
part_kwargs = {k: v for k, v in variable_part.items() if k != 'type'}
candidate_value = func(**part_kwargs)
found = True
except:
self.P(f"Error calling function {func_name} for dynamic env var {variable_name}", color='r')
#endif callable
except Exception as e:
self.P(
"Error calling dynamic env var function {} for variable {}: {}".format(
func_name, variable_name, e),
color='r')
# endif callable
#endif hasattr
if not found:
self.P(f"Dynamic env var {variable_name} has invalid type: {part_type}", color='r')
Expand Down
4 changes: 3 additions & 1 deletion extensions/business/container_apps/worker_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@

"CAR_VERBOSE": 10,
"IMAGE": "node:22",
"CONTAINER_START_COMMAND": ["sh", "-c", "while true; do sleep 3600; done"],
"CONTAINER_ENTRYPOINT": "sh",
"CONTAINER_START_COMMAND": ["-c", "while true; do sleep 3600; done"],
"CONTAINER_USER": "root",
"BUILD_AND_RUN_COMMANDS": ["npm install", "npm run build", "npm start"],
"SETUP_REPO": True, # defines if we have to set up the repo (should add git clone commands or not)

Expand Down
4 changes: 4 additions & 0 deletions extensions/business/deeploy/deeploy_const.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class DEEPLOY_KEYS:
PLUGINS = "plugins"
PLUGIN_INSTANCES = "instances"
PLUGIN_INSTANCE_ID = "instance_id"
PLUGIN_NAME = "plugin_name"
DEPENDENCY_TREE = "dependency_tree"
# Auth result keys
SENDER = "sender"
SENDER_ESCROW = "sender_escrow"
Expand Down Expand Up @@ -104,6 +106,8 @@ class DEEPLOY_ERRORS:
PLUGINS1 = "ERR25_DEEPLOY_PLUGINS1" # Invalid plugins array structure
PLUGINS2 = "ERR26_DEEPLOY_PLUGINS2" # Plugin missing required field
PLUGINS3 = "ERR27_DEEPLOY_PLUGINS3" # Instance validation failed
DEPENDENCY_TREE1 = "ERR28_DEEPLOY_DEPENDENCY_TREE1" # Invalid dependency_tree entry
DEPENDENCY_TREE2 = "ERR29_DEEPLOY_DEPENDENCY_TREE2" # Circular dependency detected

class DEEPLOY_RESOURCES:
# Result dictionary keys
Expand Down
3 changes: 2 additions & 1 deletion extensions/business/deeploy/deeploy_manager_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ def _process_pipeline_request(
if job_app_type not in JOB_APP_TYPES_ALL:
raise ValueError(f"Invalid job_app_type '{job_app_type}'. Expected one of {JOB_APP_TYPES_ALL}.")
else:
job_app_type = self.deeploy_detect_job_app_type(self.deeploy_prepare_plugins(inputs))
plugins_for_detection, _ = self.deeploy_prepare_plugins(inputs)
job_app_type = self.deeploy_detect_job_app_type(plugins_for_detection)
if job_app_type not in JOB_APP_TYPES_ALL:
job_app_type = JOB_APP_TYPES.NATIVE
self.P(f"Detected job app type: {job_app_type}")
Expand Down
Loading
Loading