diff --git a/tesp_api/repository/model/task.py b/tesp_api/repository/model/task.py index 7a31702..7f64889 100644 --- a/tesp_api/repository/model/task.py +++ b/tesp_api/repository/model/task.py @@ -52,7 +52,7 @@ class TesTaskInput(BaseModel): ..., description="Path of the file inside the container. Must be an absolute path.", example="/data/file1") - type: TesTaskIOType = Field(..., example=TesTaskIOType.FILE) + type: TesTaskIOType = TesTaskIOType.FILE content: str = Field( None, description="File content literal. Implementations should support a minimum of 128 KiB " "in this field and may define their own maximum. UTF-8 encoded If content " diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index 09c837e..7c235c5 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -2,22 +2,10 @@ import datetime from typing import List from pathlib import Path - from pymonad.maybe import Just from bson.objectid import ObjectId from pymonad.promise import Promise - -from tesp_api.utils.docker import ( - docker_run_command, - docker_stage_in_command, - docker_stage_out_command, - map_volumes -) -from tesp_api.utils.singularity import ( - singularity_run_command, - singularity_stage_in_command, - singularity_stage_out_command -) +from tesp_api.utils.container import stage_in_command, run_command, stage_out_command, map_volumes from tesp_api.service.pulsar_service import pulsar_service from tesp_api.service.event_dispatcher import dispatch_event from tesp_api.utils.functional import get_else_throw, maybe_of @@ -31,11 +19,12 @@ TesTaskExecutor, TesTaskResources, TesTaskInput, - TesTaskOutput + TesTaskOutput, + TesTaskIOType ) from tesp_api.repository.task_repository_utils import append_task_executor_logs, update_last_task_log_time -CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "both") +CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "docker") @local_handler.register(event_name="queued_task") @@ -93,16 +82,24 @@ async def setup_data(job_id: ObjectId, print(inputs) - for i in range(0, len(inputs)): - content = inputs[i].content - pulsar_path = payload['task_config']['inputs_directory'] + f'/input_file_{i}' - if content is not None and inputs[i].url is None: - #content = await file_transfer_service.download_file(inputs[i].url) + for i, input_item in enumerate(inputs): + if input_item.type == TesTaskIOType.DIRECTORY: + pulsar_path = payload['task_config']['inputs_directory'] + f'/input_dir_{i}' + elif input_item.content is not None and input_item.url is None: pulsar_path = await pulsar_operations.upload( job_id, DataType.INPUT, - file_content=Just(content), - file_path=f'input_file_{i}') - input_confs.append({'container_path': inputs[i].path, 'pulsar_path': pulsar_path, 'url':inputs[i].url}) + file_content=Just(input_item.content), + file_path=f'input_file_{i}' + ) + else: + pulsar_path = payload['task_config']['inputs_directory'] + f'/input_file_{i}' + + input_confs.append({ + 'container_path': input_item.path, + 'pulsar_path': pulsar_path, + 'url': input_item.url, + 'type': input_item.type + }) return resource_conf, volume_confs, input_confs, output_confs @@ -126,7 +123,7 @@ async def setup_data(job_id: ObjectId, 'input_confs': res_input_output_confs[2], 'output_confs': res_input_output_confs[3] })).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\ - .then(lambda x: x) # invokes promise returned by error handler, otherwise acts as identity function + .then(lambda x: x) @local_handler.register(event_name="run_task") @@ -156,57 +153,63 @@ async def handle_run_task(event: Event) -> None: start_time=Just(datetime.datetime.now(datetime.timezone.utc)) ) - # prepare docker commands container_cmds = list() - # stage-in - print("Payload:") - print(payload) - stage_in_mount = payload['task_config']['inputs_directory'] stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest", - command=[], - workdir=Path("/downloads")) - - if CONTAINER_TYPE == "docker": - stage_in_command = docker_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) - elif CONTAINER_TYPE == "singularity": - stage_exec.image = "docker://" + stage_exec.image - stage_in_command = singularity_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) - - # container_cmds.append(stage_in_command) + command=[], + workdir=Path("/downloads")) + # stage-in + stage_in_cmd = "" + if input_confs: + stage_in_mount = payload['task_config']['inputs_directory'] + stage_in_cmd = stage_in_command( + stage_exec, + resource_conf, + stage_in_mount, + input_confs, + CONTAINER_TYPE + ) + # main execution + container_cmds = [] for i, executor in enumerate(task.executors): - if CONTAINER_TYPE == "docker": - run_command, script_content = docker_run_command(executor, task_id, resource_conf, volume_confs, - input_confs, output_confs, stage_in_mount, i) - elif CONTAINER_TYPE == "singularity": - mount_job_dir = payload['task_config']['job_directory'] - run_command, script_content = singularity_run_command(executor, task_id, resource_conf, volume_confs, - input_confs, output_confs, stage_in_mount, mount_job_dir, i) - - - await pulsar_operations.upload( - payload['task_id'], DataType.INPUT, - file_content=Just(script_content), - file_path=f'run_script_{i}.sh') - container_cmds.append(run_command) - - if CONTAINER_TYPE == "docker": - stage_out_command = docker_stage_out_command(stage_exec, resource_conf, output_confs, volume_confs) - elif CONTAINER_TYPE == "singularity": - mount_job_dir = payload['task_config']['job_directory'] - bind_mount = payload['task_config']['inputs_directory'] - stage_out_command = singularity_stage_out_command(stage_exec, resource_conf, bind_mount, - output_confs, volume_confs, mount_job_dir) - - # Join all commands with " && " - run_commands = " && ".join(container_cmds) + run_cmd = run_command( + executor=executor, + job_id=task_id, + resource_conf=resource_conf, + volume_confs=volume_confs, + input_confs=input_confs, + output_confs=output_confs, + inputs_directory=stage_in_mount if input_confs else "", + container_type=CONTAINER_TYPE, + job_directory=payload['task_config']['job_directory'] if CONTAINER_TYPE == "singularity" else None, + executor_index=i + ) + container_cmds.append(run_cmd) + + # stage-out + stage_out_cmd = "" + if output_confs: + stage_out_cmd = stage_out_command( + stage_exec, + resource_conf, + output_confs, + volume_confs, + container_type=CONTAINER_TYPE, + bind_mount=payload['task_config']['inputs_directory'] if CONTAINER_TYPE == "singularity" else None, + job_directory=payload['task_config']['job_directory'] if CONTAINER_TYPE == "singularity" else None + ) - run_command = (f"""set -xe && {stage_in_command} && {run_commands} && {stage_out_command}""") + # Combine commands + run_commands = " && ".join(container_cmds) + parts = ["set -xe", stage_in_cmd, run_commands, stage_out_cmd] + non_empty_parts = [p.strip() for p in parts if p and p.strip()] + full_command = " && ".join(non_empty_parts) + print(full_command) command_start_time = datetime.datetime.now(datetime.timezone.utc) # start the task (docker container/s) in the pulsar - await pulsar_operations.run_job(task_id, run_command) + await pulsar_operations.run_job(task_id, full_command) # wait for the task command_status = await pulsar_operations.job_status_complete(str(task_id)) diff --git a/tesp_api/utils/container.py b/tesp_api/utils/container.py new file mode 100644 index 0000000..9daea2d --- /dev/null +++ b/tesp_api/utils/container.py @@ -0,0 +1,368 @@ +import os +import re +import shlex +from urllib.parse import urlparse +from typing import Dict, List, Tuple, Optional, Union + +from pymonad.maybe import Nothing, Maybe, Just + +from tesp_api.repository.model.task import TesTaskExecutor, TesTaskOutput, TesTaskIOType +from tesp_api.utils.functional import get_else_throw, maybe_of + +SHELL_PATTERN = re.compile(r"[|&;<>(){}$*?\"'\\]") + +class ContainerCommandBuilder: + def __init__(self, container_type: str) -> None: + self.container_type = container_type + self._job_id: str = "" + self._resource_cpu: Maybe[str] = Nothing + self._resource_mem: Maybe[str] = Nothing + self._image: Maybe[str] = Nothing + self._workdir: Maybe[str] = Nothing + self._envs: Dict[str, str] = {} + self._volumes: Dict[str, str] = {} + self._bind_mounts: Dict[str, str] = {} + self._command: Maybe[str] = Nothing + self._stdin: Maybe[str] = Nothing + self._stdout: Maybe[str] = Nothing + self._stderr: Maybe[str] = Nothing + + def with_job_id(self, job_id: str): + self._job_id = job_id + return self + + def with_resource(self, resources: dict): + if not resources: + return self + self._resource_cpu = maybe_of(resources.get("cpu_cores")) + self._resource_mem = maybe_of(resources.get("ram_gb")) + return self + + def with_bind_mount(self, container_path: str, host_path: str): + self._bind_mounts[container_path] = host_path + return self + + def with_volume(self, container_path: str, volume_name: str): + self._volumes[container_path] = volume_name + return self + + def with_image(self, image: str): + if self.container_type == "singularity" and not image.startswith("docker://"): + self._image = Just(f"docker://{image}") + else: + self._image = Just(image) + return self + + def with_workdir(self, workdir: str): + self._workdir = maybe_of(workdir) + return self + + def with_env(self, name: str, value: str): + self._envs[name] = value + return self + + def with_command(self, command: List[str], stdin: Maybe[str] = Nothing, + stdout: Maybe[str] = Nothing, stderr: Maybe[str] = Nothing): + self._stdin = stdin + self._stdout = stdout + self._stderr = stderr + + if not command: + self._command = Nothing + return self + + # Check if already wrapped in shell + if self._is_shell_wrapped(command): + self._command = Just(command) + return self + + # Wrap in shell if needed + if self.requires_shell(command) or self.container_type == "singularity": + shell_cmd = " ".join(shlex.quote(str(arg)) for arg in command) + shell_cmd = self._escape_redirections( + shell_cmd, + self._stdin, + self._stdout, + self._stderr + ) + self._command = Just(["sh", "-c", shell_cmd]) + else: + cmd = list(map(str, command)) + cmd = self._add_redirections(cmd) + self._command = Just(cmd) + + return self + + def _is_shell_wrapped(self, command) -> bool: + if isinstance(command, list): + return len(command) >= 2 and command[0] == "sh" and command[1] == "-c" + return False + + def requires_shell(self, command: List[str]) -> bool: + return any( + isinstance(arg, str) and SHELL_PATTERN.search(arg) + for arg in command + ) + + def _escape_redirections(self, cmd: str, stdin, stdout, stderr) -> str: + redirections = [ + ('<', stdin), + ('>', stdout), + ('2>', stderr), + ] + for op, val in redirections: + if val and val != Nothing: + path = val.maybe("", lambda x: x) + if path: + cmd += f" {op} {shlex.quote(path)}" + return cmd + + def _add_redirections(self, cmd: List[str]) -> List[str]: + redirections = [] + if self._stdin != Nothing: + path = self._stdin.maybe("", lambda x: x) + if path: + redirections.extend(["<", path]) + if self._stdout != Nothing: + path = self._stdout.maybe("", lambda x: x) + if path: + redirections.extend([">", path]) + if self._stderr != Nothing: + path = self._stderr.maybe("", lambda x: x) + if path: + redirections.extend(["2>", path]) + return cmd + redirections + + def reset(self) -> None: + self._resource_cpu = Nothing + self._resource_mem = Nothing + self._image = Nothing + self._workdir = Nothing + self._envs = {} + self._volumes = {} + self._bind_mounts = {} + self._command = Nothing + self._stdin = Nothing + self._stdout = Nothing + self._stderr = Nothing + return self + + def get_run_command(self) -> str: + # Common resource flags + cpu_flag = self._resource_cpu.maybe("", lambda cpu: + f"--cpus={cpu}" if self.container_type == "docker" else f"--cpu={cpu}") + + mem_flag = self._resource_mem.maybe("", lambda mem: + f"--memory={mem}g" if self.container_type == "docker" else f"--memory={mem}G") + + # Environment variables + env_flags = [] + for k, v in self._envs.items(): + if self.container_type == "docker": + env_flags.append(f'-e {shlex.quote(k)}={shlex.quote(v)}') + else: + env_flags.append(f'--env {k}="{v}"') + + # Work directory + workdir_flag = self._workdir.maybe("", lambda w: + f'-w "{w}"' if self.container_type == "docker" else f'--pwd "{w}"') + + # Image + image = self._image.maybe("", lambda i: i) + + # Mounts + mount_flags = [] + if self.container_type == "docker": + for container_path, host_path in self._bind_mounts.items(): + mount_flags.append(f'-v "{host_path}":"{container_path}"') + for container_path, volume_name in self._volumes.items(): + mount_flags.append(f'-v "{volume_name}":"{container_path}"') + else: # singularity + for container_path, host_path in self._bind_mounts.items(): + mount_flags.append(f'-B "{host_path}":"{container_path}"') + for container_path, volume_name in self._volumes.items(): + mount_flags.append(f'-B "{volume_name}":"{container_path}"') + + # Command + command_str = self._command.maybe("", lambda cmd: + " ".join(shlex.quote(arg) for arg in cmd) if isinstance(cmd, list) else cmd) + + # Build final command + if self.container_type == "docker": + return ( + f"docker run {cpu_flag} {mem_flag} {workdir_flag} " + f"{' '.join(env_flags)} {' '.join(mount_flags)} " + f"{image} {command_str}" + ).strip() + else: # singularity + return ( + f"singularity exec {cpu_flag} {mem_flag} {workdir_flag} " + f"{' '.join(env_flags)} {' '.join(mount_flags)} " + f"{image} {command_str}" + ).strip() + +# Unified command functions +def stage_in_command( + executor: TesTaskExecutor, + resource_conf: dict, + bind_mount: str, + input_confs: List[dict], + container_type: str +) -> str: + builder = ContainerCommandBuilder(container_type) \ + .with_image(executor.image) \ + .with_workdir(executor.workdir) \ + .with_resource(resource_conf) + + commands = [] + for input_conf in input_confs: + url = input_conf.get('url') + if not url: + continue + + path = input_conf['pulsar_path'] + input_type = input_conf.get('type', TesTaskIOType.FILE) + filename = os.path.basename(path) + + if input_type == TesTaskIOType.DIRECTORY: + # Recursive download + commands.append( + f"wget --mirror --no-parent --no-host-directories " + f"--directory-prefix={shlex.quote(filename)} {shlex.quote(url)}" + ) + else: + # Single file download + commands.append(f"curl -o {shlex.quote(filename)} {shlex.quote(url)}") + + if commands: + builder.with_command(["sh", "-c", " && ".join(commands)]) + + builder.with_bind_mount(executor.workdir, bind_mount) + + if executor.env: + for env_name, env_value in executor.env.items(): + builder.with_env(env_name, env_value) + + return builder.get_run_command() + +def run_command( + executor: TesTaskExecutor, + job_id: str, + resource_conf: dict, + volume_confs: List[dict], + input_confs: List[dict], + output_confs: List[dict], + inputs_directory: str, + container_type: str, + job_directory: Optional[str] = None, + executor_index: int = 0 +) -> str: + builder = ContainerCommandBuilder(container_type) \ + .with_job_id(job_id) \ + .with_image(executor.image) \ + .with_command( + list(map(str, executor.command)), + maybe_of(executor.stdin).map(str), + maybe_of(executor.stdout).map(str), + maybe_of(executor.stderr).map(str) + ) \ + .with_workdir(executor.workdir) \ + .with_resource(resource_conf) + + if executor.env: + for env_name, env_value in executor.env.items(): + builder.with_env(env_name, env_value) + + # Handle volumes and bind mounts + for volume_conf in volume_confs: + builder.with_volume(volume_conf['container_path'], volume_conf['volume_name']) + + for input_conf in input_confs: + builder.with_bind_mount(input_conf['container_path'], input_conf['pulsar_path']) + + return builder.get_run_command() + +def stage_out_command( + executor: TesTaskExecutor, + resource_conf: dict, + output_confs: List[dict], + volume_confs: List[dict], + container_type: str, + bind_mount: Optional[str] = None, + job_directory: Optional[str] = None +) -> str: + builder = ContainerCommandBuilder(container_type) \ + .with_image(executor.image) \ + .with_workdir(executor.workdir) \ + .with_resource(resource_conf) + + commands = [] + for output in output_confs: + path = output['container_path'] + url = output['url'] + output_type = output.get('type', TesTaskIOType.FILE) + + if output_type == TesTaskIOType.DIRECTORY: + # Recursive upload + cmd = ( + f"find {shlex.quote(path)} -type f -exec " + f"curl -X POST -F 'file=@{{}}' {shlex.quote(url)} \\;" + ) + else: + # Single file upload + cmd = f"curl -X POST -F 'file=@{shlex.quote(path)}' {shlex.quote(url)}" + + commands.append(cmd) + + if commands: + builder.with_command(["sh", "-c", " && ".join(commands)]) + + # Mount required directories + if container_type == "singularity" and bind_mount: + builder.with_bind_mount(executor.workdir, bind_mount) + + for volume_conf in volume_confs: + if container_type == "docker": + builder.with_volume(volume_conf['container_path'], volume_conf['volume_name']) + elif container_type == "singularity" and job_directory: + builder.with_bind_mount(volume_conf['container_path'], job_directory) + + if executor.env: + for env_name, env_value in executor.env.items(): + builder.with_env(env_name, env_value) + + return builder.get_run_command() + +# Volume mapping function remains the same +def map_volumes(job_id: str, volumes: List[str], outputs: List[TesTaskOutput]): + output_confs: List[dict] = [] + volume_confs: List[dict] = [] + existing_volume_paths = [] + + # Process outputs + for output in outputs: + output_dirname = os.path.dirname(output.path) + volume_name = f"vol-{job_id}-{output_dirname.replace('/', '')}" + + if output_dirname not in existing_volume_paths: + volume_confs.append({ + 'volume_name': volume_name, + 'container_path': output_dirname + }) + existing_volume_paths.append(output_dirname) + + output_confs.append({ + 'container_path': output.path, + 'url': output.url, + 'volume_name': volume_name, + 'type': output.type + }) + + for v in volumes: + if str(v) not in existing_volume_paths: + volume_confs.append({ + 'volume_name': f"vol-{job_id}-{str(v).replace('/', '')}", + 'container_path': str(v) + }) + + return output_confs, volume_confs diff --git a/tesp_api/utils/docker.py b/tesp_api/utils/docker.py deleted file mode 100644 index 1e9e942..0000000 --- a/tesp_api/utils/docker.py +++ /dev/null @@ -1,223 +0,0 @@ -import os -from typing import Dict, List, Tuple - -from pymonad.maybe import Nothing, Maybe, Just - -from tesp_api.repository.model.task import TesTaskExecutor, TesTaskOutput -from tesp_api.utils.functional import get_else_throw, maybe_of - - -class DockerRunCommandBuilder: - - def __init__(self) -> None: - self._job_id: str = "" - self._resource_cpu: Maybe[str] = Nothing - self._resource_mem: Maybe[str] = Nothing - self._docker_image: Maybe[str] = Nothing - self._workdir: Maybe[str] = Nothing - self._envs: Dict[str, str] = {} - self._volumes: Dict[str, str] = {} - self._bind_mounts: Dict[str, str] = {} - self._command: Maybe[str] = Nothing - - def with_job_id(self, job_id: str): - self._job_id = job_id - return self - - def with_resource(self, resources: dict): - if not resources: return self - self._resource_cpu = maybe_of(resources["cpu_cores"]) - self._resource_mem = maybe_of(resources["ram_gb"]) - return self - - def with_bind_mount(self, container_path: str, host_path: str): - self._bind_mounts[container_path] = host_path - return self - - def with_volume(self, container_path: str, volume_name: str): - self._volumes[container_path] = volume_name - return self - - def with_image(self, image: str): - self._docker_image = Just(image) - return self - - def with_workdir(self, workdir: str): - self._workdir = maybe_of(workdir) - return self - - def with_env(self, name: str, value: str): - self._envs[name] = value - return self - - def with_command(self, command: List[str], stdin: Maybe[str] = Nothing, - stdout: Maybe[str] = Nothing, stderr: Maybe[str] = Nothing): - command_str = " ".join(command) - self._command = Just(command_str) if command_str else Nothing - - # sh -c '' # there probably must be ' instead of " because of the passing unresolved envs into the container - self._command = self._command.map(lambda _command: - f'{_command}' - f'{stdin.maybe("", lambda x: " <" + x)}' - f'{stdout.maybe("", lambda x: " 1>" + x)}' - f'{stderr.maybe("", lambda x: " 2>" + x)}') - return self - - def reset(self) -> None: - self._resource_cpu = Nothing - self._resource_mem = Nothing - self._docker_image = Nothing - self._workdir = Nothing - self._volumes = {} - self._bind_mounts = {} - return self - - def get_run_command(self) -> str: - resources_str = (f'{self._resource_cpu.maybe("", lambda cpu: " --cpus="+str(cpu))}' - f'{self._resource_mem.maybe("", lambda mem: " --memory="+str(mem)+"g")}') - bind_mounts_str = " ".join(map(lambda v_paths: f'-v \"{v_paths[1]}\":\"{v_paths[0]}\"', self._bind_mounts.items())) - volumes_str = " ".join(map(lambda v_paths: f'-v \"{v_paths[1]}\":\"{v_paths[0]}\"', self._volumes.items())) - docker_image = get_else_throw(self._docker_image, ValueError('Docker image is not set')) - workdir_str = self._workdir.maybe("", lambda workdir: f"-w=\"{str(workdir)}\"") - env_str = " ".join(map(lambda env: f'-e {env[0]}=\"{env[1]}\"', self._envs.items())) - command_str = self._command.maybe("", lambda x: x) - - run_command = f'docker run {resources_str} {workdir_str} {env_str} {volumes_str} {bind_mounts_str} {docker_image} {command_str}' - self.reset() - return run_command - - def get_run_command_script(self, inputs_directory: str, i: int) -> Tuple[str, str]: - resources_str = (f'{self._resource_cpu.maybe("", lambda cpu: " --cpus="+str(cpu))}' - f'{self._resource_mem.maybe("", lambda mem: " --memory="+str(mem)+"g")}') - bind_mounts_str = " ".join(map(lambda v_paths: f'-v \"{v_paths[1]}\":\"{v_paths[0]}\"', self._bind_mounts.items())) - volumes_str = " ".join(map(lambda v_paths: f'-v \"{v_paths[1]}\":\"{v_paths[0]}\"', self._volumes.items())) - docker_image = get_else_throw(self._docker_image, ValueError('Docker image is not set')) - workdir_str = self._workdir.maybe("", lambda workdir: f"-w=\"{str(workdir)}\"") - volumes_str += f' -v "{inputs_directory}/run_script_{i}.sh":"/tmp/{self._job_id}/run_script_{i}.sh"' - env_str = " ".join(map(lambda env: f'-e {env[0]}=\"{env[1]}\"', self._envs.items())) - command_str = self._command.maybe("", lambda x: x) - - chmod_commands = f"chmod +x /tmp/{self._job_id}/run_script_{i}.sh" - if self._bind_mounts: - chmod_commands += ' && ' + ' && '.join(f"chmod +x {key}" for key in self._bind_mounts) - if self._volumes: - chmod_commands += ' && ' + ' && '.join(f"chmod +x {key}" for key in self._volumes) - - # Define the content of the script - script_content = f'''\ - #!/bin/bash - {command_str} - ''' - - run_command = (f'docker run {resources_str} {workdir_str} {env_str} ' - f'{volumes_str} {bind_mounts_str} {docker_image} ' - f'sh -c "{chmod_commands} && /tmp/{self._job_id}/run_script_{i}.sh"') - - self.reset() - return run_command, script_content - -def docker_run_command(executor: TesTaskExecutor, job_id: str, resource_conf: dict, volume_confs: List[dict], - input_confs: List[dict], output_confs: List[dict], inputs_directory: str, i: int) -> Tuple[str, str]: - command_builder = DockerRunCommandBuilder()\ - .with_job_id(job_id) \ - .with_image(executor.image) \ - .with_command( - list(map(lambda x: str(x), executor.command)), - maybe_of(executor.stdin).map(lambda x: str(x)), - maybe_of(executor.stdout).map(lambda x: str(x)), - maybe_of(executor.stderr).map(lambda x: str(x))) \ - .with_workdir(executor.workdir) \ - .with_resource(resource_conf) - - if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] - - [command_builder.with_volume(volume_conf['container_path'], volume_conf['volume_name']) - for volume_conf in volume_confs] - [command_builder.with_bind_mount(input_conf['container_path'], input_conf['pulsar_path']) - for input_conf in input_confs] - - return command_builder.get_run_command_script(inputs_directory, i) - -def docker_stage_in_command(executor: TesTaskExecutor, resource_conf: dict, - bind_mount: str, input_confs: List[dict]) -> str: - command_builder = DockerRunCommandBuilder() \ - .with_image(executor.image) \ - .with_workdir(executor.workdir) \ - .with_resource(resource_conf) - - command = "" - - for input in input_confs: - if (input['url']): - command += "curl -o " + os.path.basename(input['pulsar_path']) + " '" + input['url'] + "' && " - command = command[:-3] - - command_builder._command = Just('sh -c "' + command + '"') - - command_builder.with_bind_mount(executor.workdir, bind_mount) - if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] - - return command_builder.get_run_command() - -def docker_stage_out_command(executor: TesTaskExecutor, resource_conf: dict, - output_confs: List[dict], volume_confs: List[dict]) -> str: - command_builder = DockerRunCommandBuilder() \ - .with_image(executor.image) \ - .with_workdir(executor.workdir) \ - .with_resource(resource_conf) - - command = "" - - for output in output_confs: - command += "curl -X POST -H 'Content-Type: multipart/form-data' -F 'file=@" \ - + output['container_path'] + "' '" + output['url'] + "' && " - command = command[:-3] - - command_builder._command = Just('sh -c "' + command + '"') - - for volume_conf in volume_confs: - command_builder.with_volume(volume_conf['container_path'], volume_conf['volume_name']) - - if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] - - return command_builder.get_run_command() - -def map_volumes(job_id: str, volumes: List[str], outputs: List[TesTaskOutput]): - output_confs: List[dict] = [] - volume_confs: List[dict] = [] - - existing_volume_paths = [] - - # Process outputs - for output in outputs: - output_dirname = os.path.dirname(output.path) - volume_name = f"vol-{job_id}-{output_dirname.replace('/', '')}" - - if output_dirname not in existing_volume_paths: - volume_confs.append({ - 'volume_name': volume_name, - 'container_path': output_dirname - }) - existing_volume_paths.append(output_dirname) - - output_confs.append({ - 'container_path': output.path, - 'url': output.url, - 'volume_name': volume_name - }) - - for v in volumes: - if str(v) not in existing_volume_paths: - volume_confs.append({ - 'volume_name': f"vol-{job_id}-{str(v).replace('/', '')}", - 'container_path': str(v) - }) - - return output_confs, volume_confs - diff --git a/tesp_api/utils/singularity.py b/tesp_api/utils/singularity.py deleted file mode 100644 index 4fc2967..0000000 --- a/tesp_api/utils/singularity.py +++ /dev/null @@ -1,190 +0,0 @@ -import os -from typing import Dict, List, Tuple - -from pymonad.maybe import Nothing, Maybe, Just - -from tesp_api.repository.model.task import TesTaskExecutor, TesTaskOutput -from tesp_api.utils.functional import get_else_throw, maybe_of - - -class SingularityCommandBuilder: - - def __init__(self) -> None: - self._job_id: str = "" - self._resource_cpu: Maybe[str] = Nothing - self._resource_mem: Maybe[str] = Nothing - self._singularity_image: Maybe[str] = Nothing - self._workdir: Maybe[str] = Nothing - self._envs: Dict[str, str] = {} - self._volumes: Dict[str, str] = {} - self._bind_mounts: Dict[str, str] = {} - self._command: Maybe[str] = Nothing - - def with_job_id(self, job_id: str): - self._job_id = job_id - return self - - def with_resource(self, resources: dict): - if not resources: return self - self._resource_cpu = maybe_of(resources["cpu_cores"]) - self._resource_mem = maybe_of(resources["ram_gb"]) - return self - - def with_bind_mount(self, container_path: str, host_path: str): - self._bind_mounts[container_path] = host_path - return self - - def with_volume(self, container_path: str, volume_name: str): - self._volumes[container_path] = volume_name - return self - - def with_image(self, image: str): - self._singularity_image = Just(image) - return self - - def with_workdir(self, workdir: str): - self._workdir = maybe_of(workdir) - return self - - def with_env(self, name: str, value: str): - self._envs[name] = value - return self - - def with_command(self, command: List[str], stdin: Maybe[str] = Nothing, - stdout: Maybe[str] = Nothing, stderr: Maybe[str] = Nothing): - command_str = " ".join(command) - self._command = Just(command_str) if command_str else Nothing - - # sh -c '' # there probably must be ' instead of " because of the passing unresolved envs into the container - self._command = self._command.map(lambda _command: - f'/bin/bash -c \'{_command}' # f'"{_command}' - f'{stdin.maybe("", lambda x: " <" + x)}' - f'{stdout.maybe("", lambda x: " 1>" + x)}' - f'{stderr.maybe("", lambda x: " 2>" + x)}\'') - return self - - def reset(self) -> None: - self._resource_cpu = Nothing - self._resource_mem = Nothing - self._singularity_image = Nothing - self._workdir = Nothing - self._volumes = {} - self._bind_mounts = {} - return self - - def get_singularity_run_command(self) -> str: - resources_str = (f'{self._resource_cpu.maybe("", lambda cpu: " --cpus="+str(cpu))}' - f'{self._resource_mem.maybe("", lambda mem: " --memory="+str(mem)+"g")}') - first_key, first_value = next(iter(self._bind_mounts.items())) - bind_mounts_str = f'-B "{first_value}":"{first_key}"' - volumes_str = " ".join(map(lambda v_paths: f'-B \"{v_paths[1]}\":\"{v_paths[0]}\"', self._volumes.items())) - singularity_image = get_else_throw(self._singularity_image, ValueError('Singularity image is not set')) - workdir_str = self._workdir.maybe("", lambda workdir: f"--pwd \"{str(workdir)}\"") - env_str = " ".join(map(lambda env: f'--env {env[0]}=\"{env[1]}\"', self._envs.items())) - command_str = self._command.maybe("", lambda x: x) - - run_command = f"""singularity exec {resources_str} {workdir_str} {env_str} {volumes_str} {bind_mounts_str} {singularity_image} {command_str}""" - # run_command = f'singularity exec {resources_str} {workdir_str} {env_str} {volumes_str} {bind_mounts_str} {singularity_image} {command_str}' - self.reset() - return run_command - - def get_singularity_run_command_script(self, inputs_directory: str, i: int) -> Tuple[str, str]: - resources_str = (f'{self._resource_cpu.maybe("", lambda cpu: " --cpus="+str(cpu))}' - f'{self._resource_mem.maybe("", lambda mem: " --memory="+str(mem)+"g")}') - bind_mounts_str = " ".join(map(lambda v_paths: f'-B \"{v_paths[1]}\":\"{v_paths[0]}\"', self._bind_mounts.items())) - volumes_str = " ".join(map(lambda v_paths: f'-B \"{v_paths[1]}\":\"{v_paths[0]}\"', self._volumes.items())) - singularity_image = get_else_throw(self._singularity_image, ValueError('Singularity image is not set')) - workdir_str = self._workdir.maybe("", lambda workdir: f"--pwd \"{str(workdir)}\"") - volumes_str += f' -B "{inputs_directory}/run_script_{i}.sh":"/tmp/{self._job_id}/run_script_{i}.sh"' - env_str = " ".join(map(lambda env: f'--env {env[0]}=\"{env[1]}\"', self._envs.items())) - command_str = self._command.maybe("", lambda x: x) - - # Define the content of the script - script_content = f'''\ - #!/bin/bash - {command_str} - ''' - - run_command = (f'singularity exec {resources_str} {workdir_str} {env_str} ' - f'{volumes_str} {bind_mounts_str} {singularity_image} ' - f'/bin/bash /tmp/{self._job_id}/run_script_{i}.sh') - self.reset() - return run_command, script_content - -def singularity_run_command(executor: TesTaskExecutor, job_id: str, resource_conf: dict, - volume_confs: List[dict], input_confs: List[dict], - output_confs: List[dict], inputs_directory: str, job_directory: str, i: int) -> Tuple[str, str]: - command_builder = SingularityCommandBuilder() \ - .with_job_id(job_id) \ - .with_image(executor.image) \ - .with_command( - list(map(lambda x: str(x), executor.command)), - maybe_of(executor.stdin).map(lambda x: str(x)), - maybe_of(executor.stdout).map(lambda x: str(x)), - maybe_of(executor.stderr).map(lambda x: str(x))) \ - .with_workdir(executor.workdir) \ - .with_resource(resource_conf) - - if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] - - # This is made only for Galaxy and wil likely not work with different structure of a job - command_builder.with_volume(volume_confs[0]['container_path'], job_directory) - [command_builder.with_bind_mount(input_conf['container_path'], input_conf['pulsar_path']) - for input_conf in input_confs] - - return command_builder.get_singularity_run_command_script(inputs_directory, i) - -def singularity_stage_in_command(executor: TesTaskExecutor, resource_conf: dict, bind_mount: str, - input_confs: List[dict]) -> str: - command_builder = SingularityCommandBuilder() \ - .with_image(executor.image) \ - .with_workdir(executor.workdir) \ - .with_resource(resource_conf) - - command = "" - - for input_conf in input_confs: - if input_conf['url']: - filename = os.path.basename(input_conf['pulsar_path']) - command += f"""curl -o {filename} '{input_conf['url']}' && """ - # command += "curl -o " + os.path.basename(input_conf['pulsar_path']) + " '" + input_conf['url'] + "' && " - command = command[:-3] - - command_builder._command = Just('sh -c "' + command + '"') - - command_builder.with_bind_mount(executor.workdir, bind_mount) - if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] - - return command_builder.get_singularity_run_command() - -def singularity_stage_out_command(executor: TesTaskExecutor, resource_conf: dict, bind_mount: str, - output_confs: List[dict], volume_confs: List[dict], job_directory: str) -> str: - command_builder = SingularityCommandBuilder() \ - .with_image(executor.image) \ - .with_workdir(executor.workdir) \ - .with_resource(resource_conf) - - command = "" - - for output in output_confs: - command += f"""curl -X POST -H 'Content-Type: multipart/form-data' -F 'file=@{output['container_path']}' '{output['url']}' && """ - - # command += "curl -X POST -H 'Content-Type: multipart/form-data' -F 'file=@" \ - # + output['container_path'] + "' '" + output['url'] + "' && " - command = command[:-3] - - command_builder._command = Just('sh -c "' + command + '"') - - # This is made only for Galaxy and wil likely not work with different structure of a job - command_builder.with_bind_mount(volume_confs[0]['container_path'], job_directory) - #command_builder.with_volume(volume_confs[0]['container_path'], job_directory) - - if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] - - return command_builder.get_singularity_run_command() diff --git a/tests/smoke_tests.py b/tests/smoke_tests.py index f60c5f9..4177273 100644 --- a/tests/smoke_tests.py +++ b/tests/smoke_tests.py @@ -163,7 +163,7 @@ def test_inputs(): # Downloads and copies file to the shared volume and displays its content with two separate executors. def test_volumes(): - assert _test_simple("volumes.json", 60) + assert _test_simple("volumes.json", 100) # Verifies that environment variables from envs.json are correctly echoed to output files with expected content. def test_envs(): diff --git a/tests/test_jsons/envs.json b/tests/test_jsons/envs.json index ff681ee..f1e64a3 100644 --- a/tests/test_jsons/envs.json +++ b/tests/test_jsons/envs.json @@ -1,29 +1,28 @@ { - "outputs": [ - + "outputs": [ { - "path": "/tesp-api/tests/test_data/env_test_1", - "url": "http://172.17.0.1:5000/upload", - "type": "FILE" + "path": "/tesp-api/tests/test_data/env_test_1", + "url": "http://172.17.0.1:5000/upload", + "type": "FILE" }, { - "path": "/tesp-api/tests/test_data/env_test_2", - "url": "http://172.17.0.1:5000/upload", - "type": "FILE" + "path": "/tesp-api/tests/test_data/env_test_2", + "url": "http://172.17.0.1:5000/upload", + "type": "FILE" } - -], - "executors":[ - { - "image":"ubuntu", - "command":[ - "echo", "$TEST_FILE", ">", "/tesp-api/tests/test_data/env_test_1", "&&", "echo", "$TEST_FILE_2", ">", "/tesp-api/tests/test_data/env_test_2" - ], - "env": { - "TEST_FILE": "first upload successful", - "TEST_FILE_2": "second upload successful" - } - } - ] + ], + "executors": [ + { + "image": "ubuntu", + "command": [ + "sh", + "-c", + "echo \"$TEST_FILE\" > /tesp-api/tests/test_data/env_test_1 && echo \"$TEST_FILE_2\" > /tesp-api/tests/test_data/env_test_2" + ], + "env": { + "TEST_FILE": "first upload successful", + "TEST_FILE_2": "second upload successful" + } + } + ] } - diff --git a/tests/test_jsons/multi_true.json b/tests/test_jsons/multi_true.json index 6b4f445..3a07a8f 100644 --- a/tests/test_jsons/multi_true.json +++ b/tests/test_jsons/multi_true.json @@ -1,22 +1,16 @@ { - "executors":[ - { - "image":"ubuntu", - "command":[ - "sleep", "5", "&&", "true" - ] - }, - { - "image":"ubuntu", - "command":[ - "sleep", "5", "&&", "true" - ] - }, - { - "image":"ubuntu", - "command":[ - "sleep", "5", "&&", "true" - ] - } - ] + "executors": [ + { + "image": "ubuntu", + "command": ["sh", "-c", "sleep 5 && true"] + }, + { + "image": "ubuntu", + "command": ["sh", "-c", "sleep 5 && true"] + }, + { + "image": "ubuntu", + "command": ["sh", "-c", "sleep 5 && true"] + } + ] } diff --git a/tests/test_jsons/state_true.json b/tests/test_jsons/state_true.json index e57b0f5..643bafa 100644 --- a/tests/test_jsons/state_true.json +++ b/tests/test_jsons/state_true.json @@ -2,9 +2,7 @@ "executors":[ { "image":"ubuntu", - "command":[ - "sleep", "20", "&&", "true" - ] + "command": ["sh", "-c", "sleep 5 && echo done"] } ] } diff --git a/tests/test_jsons/volumes.json b/tests/test_jsons/volumes.json index a1d34a2..2e0b45c 100644 --- a/tests/test_jsons/volumes.json +++ b/tests/test_jsons/volumes.json @@ -1,26 +1,29 @@ { - "inputs": [ - { - "path": "/data/file_http", - "url": "http://172.17.0.1:5000/download/test.txt", - "type": "FILE" - } - ], - "volumes": [ - "/vol/" - ], - "executors":[ - { - "image":"ubuntu", - "command":[ - "cp", "/data/file_http", "/vol/", "&&", "cat", "/vol/file_http" - ] - }, - { - "image":"ubuntu", - "command":[ - "cat", "/vol/file_http" - ] - } - ] + "inputs": [ + { + "path": "/data/file_http", + "url": "http://172.17.0.1:5000/download/test.txt", + "type": "FILE" + } + ], + "volumes": [ + "/vol/" + ], + "executors": [ + { + "image": "ubuntu", + "command": [ + "sh", + "-c", + "cp /data/file_http /vol/ && cat /vol/file_http" + ] + }, + { + "image": "ubuntu", + "command": [ + "cat", + "/vol/file_http" + ] + } + ] }