Skip to content
2 changes: 1 addition & 1 deletion tesp_api/repository/model/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class TesTaskInput(BaseModel):
..., description="Path of the file inside the container. Must be an absolute path.",
example="/data/file1")

type: TesTaskIOType = Field(..., example=TesTaskIOType.FILE)
type: TesTaskIOType = TesTaskIOType.FILE
content: str = Field(
None, description="File content literal. Implementations should support a minimum of 128 KiB "
"in this field and may define their own maximum. UTF-8 encoded If content "
Expand Down
135 changes: 69 additions & 66 deletions tesp_api/service/event_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,10 @@
import datetime
from typing import List
from pathlib import Path

from pymonad.maybe import Just
from bson.objectid import ObjectId
from pymonad.promise import Promise

from tesp_api.utils.docker import (
docker_run_command,
docker_stage_in_command,
docker_stage_out_command,
map_volumes
)
from tesp_api.utils.singularity import (
singularity_run_command,
singularity_stage_in_command,
singularity_stage_out_command
)
from tesp_api.utils.container import stage_in_command, run_command, stage_out_command, map_volumes
from tesp_api.service.pulsar_service import pulsar_service
from tesp_api.service.event_dispatcher import dispatch_event
from tesp_api.utils.functional import get_else_throw, maybe_of
Expand All @@ -31,11 +19,12 @@
TesTaskExecutor,
TesTaskResources,
TesTaskInput,
TesTaskOutput
TesTaskOutput,
TesTaskIOType
)
from tesp_api.repository.task_repository_utils import append_task_executor_logs, update_last_task_log_time

CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "both")
CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "docker")


@local_handler.register(event_name="queued_task")
Expand Down Expand Up @@ -93,16 +82,24 @@ async def setup_data(job_id: ObjectId,

print(inputs)

for i in range(0, len(inputs)):
content = inputs[i].content
pulsar_path = payload['task_config']['inputs_directory'] + f'/input_file_{i}'
if content is not None and inputs[i].url is None:
#content = await file_transfer_service.download_file(inputs[i].url)
for i, input_item in enumerate(inputs):
if input_item.type == TesTaskIOType.DIRECTORY:
pulsar_path = payload['task_config']['inputs_directory'] + f'/input_dir_{i}'
elif input_item.content is not None and input_item.url is None:
pulsar_path = await pulsar_operations.upload(
job_id, DataType.INPUT,
file_content=Just(content),
file_path=f'input_file_{i}')
input_confs.append({'container_path': inputs[i].path, 'pulsar_path': pulsar_path, 'url':inputs[i].url})
file_content=Just(input_item.content),
file_path=f'input_file_{i}'
)
else:
pulsar_path = payload['task_config']['inputs_directory'] + f'/input_file_{i}'

input_confs.append({
'container_path': input_item.path,
'pulsar_path': pulsar_path,
'url': input_item.url,
'type': input_item.type
})

return resource_conf, volume_confs, input_confs, output_confs

Expand All @@ -126,7 +123,7 @@ async def setup_data(job_id: ObjectId,
'input_confs': res_input_output_confs[2],
'output_confs': res_input_output_confs[3]
})).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\
.then(lambda x: x) # invokes promise returned by error handler, otherwise acts as identity function
.then(lambda x: x)


@local_handler.register(event_name="run_task")
Expand Down Expand Up @@ -156,57 +153,63 @@ async def handle_run_task(event: Event) -> None:
start_time=Just(datetime.datetime.now(datetime.timezone.utc))
)

# prepare docker commands
container_cmds = list()
# stage-in
print("Payload:")
print(payload)
stage_in_mount = payload['task_config']['inputs_directory']
stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest",
command=[],
workdir=Path("/downloads"))

if CONTAINER_TYPE == "docker":
stage_in_command = docker_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs)
elif CONTAINER_TYPE == "singularity":
stage_exec.image = "docker://" + stage_exec.image
stage_in_command = singularity_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs)

# container_cmds.append(stage_in_command)
command=[],
workdir=Path("/downloads"))
# stage-in
stage_in_cmd = ""
if input_confs:
stage_in_mount = payload['task_config']['inputs_directory']
stage_in_cmd = stage_in_command(
stage_exec,
resource_conf,
stage_in_mount,
input_confs,
CONTAINER_TYPE
)

# main execution
container_cmds = []
for i, executor in enumerate(task.executors):
if CONTAINER_TYPE == "docker":
run_command, script_content = docker_run_command(executor, task_id, resource_conf, volume_confs,
input_confs, output_confs, stage_in_mount, i)
elif CONTAINER_TYPE == "singularity":
mount_job_dir = payload['task_config']['job_directory']
run_command, script_content = singularity_run_command(executor, task_id, resource_conf, volume_confs,
input_confs, output_confs, stage_in_mount, mount_job_dir, i)


await pulsar_operations.upload(
payload['task_id'], DataType.INPUT,
file_content=Just(script_content),
file_path=f'run_script_{i}.sh')
container_cmds.append(run_command)

if CONTAINER_TYPE == "docker":
stage_out_command = docker_stage_out_command(stage_exec, resource_conf, output_confs, volume_confs)
elif CONTAINER_TYPE == "singularity":
mount_job_dir = payload['task_config']['job_directory']
bind_mount = payload['task_config']['inputs_directory']
stage_out_command = singularity_stage_out_command(stage_exec, resource_conf, bind_mount,
output_confs, volume_confs, mount_job_dir)

# Join all commands with " && "
run_commands = " && ".join(container_cmds)
run_cmd = run_command(
executor=executor,
job_id=task_id,
resource_conf=resource_conf,
volume_confs=volume_confs,
input_confs=input_confs,
output_confs=output_confs,
inputs_directory=stage_in_mount if input_confs else "",
container_type=CONTAINER_TYPE,
job_directory=payload['task_config']['job_directory'] if CONTAINER_TYPE == "singularity" else None,
executor_index=i
)
container_cmds.append(run_cmd)

# stage-out
stage_out_cmd = ""
if output_confs:
stage_out_cmd = stage_out_command(
stage_exec,
resource_conf,
output_confs,
volume_confs,
container_type=CONTAINER_TYPE,
bind_mount=payload['task_config']['inputs_directory'] if CONTAINER_TYPE == "singularity" else None,
job_directory=payload['task_config']['job_directory'] if CONTAINER_TYPE == "singularity" else None
)

run_command = (f"""set -xe && {stage_in_command} && {run_commands} && {stage_out_command}""")
# Combine commands
run_commands = " && ".join(container_cmds)
parts = ["set -xe", stage_in_cmd, run_commands, stage_out_cmd]
non_empty_parts = [p.strip() for p in parts if p and p.strip()]
full_command = " && ".join(non_empty_parts)
print(full_command)

command_start_time = datetime.datetime.now(datetime.timezone.utc)

# start the task (docker container/s) in the pulsar
await pulsar_operations.run_job(task_id, run_command)
await pulsar_operations.run_job(task_id, full_command)

# wait for the task
command_status = await pulsar_operations.job_status_complete(str(task_id))
Expand Down
Loading
Loading