diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 943ae9c3..10c52cc9 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -1,3 +1,13 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2025-2026 The WfCommons Team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + import pathlib import shutil import tarfile diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index 337b2a06..34094f7e 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -296,7 +296,7 @@ def test_translator(self, backend) -> None: if parser: sys.stderr.write(f"\n[{backend}] Parsing the logs...\n") - reconstructed_workflow : Workflow = parser.build_workflow("reconstructed_workflow") + reconstructed_workflow : Workflow = parser.build_workflow(f"reconstructed_workflow_{backend}") reconstructed_workflow.write_json(pathlib.Path("/tmp/reconstructed_workflow.json")) original_workflow : Workflow = benchmark.workflow diff --git a/wfcommons/wfinstances/logs/taskvine.py b/wfcommons/wfinstances/logs/taskvine.py index 407236a2..cf367c25 100644 --- a/wfcommons/wfinstances/logs/taskvine.py +++ b/wfcommons/wfinstances/logs/taskvine.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -# Copyright (c) 2021 The WfCommons Team. +# Copyright (c) 2021-2026 The WfCommons Team. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -138,8 +138,6 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: # Construct the input and output file for each task self._construct_task_input_output_files() - # print("TASK INPUT FILES: " + str(self.task_input_files)) - # print("TASK OUTPUT FILES: " + str(self.task_output_files)) # Construct the workflow self._construct_workflow() @@ -162,38 +160,41 @@ def _construct_task_command_lines(self) -> None: def _construct_file_map(self) -> None: - filename_to_key_map = {} - # One pass through the debug file to create the initial file key -> filename mapping - with open(self.debug_file) as f: + + with open(self.taskgraph_file) as f: for line in f: - if "__vine_env_task" in line: # Ignore that weird task/file - continue - if "infile " in line : - # 2025/09/09 21:12:48.02 vine_manager[239]vine: tx to dab178765b01 (127.0.0.1:34382): infile file-rnd-fmtpwpiobiumeze blastall_00000016_outfile_0016 0 - [file_key, filename] = line[line.find("infile ") + len("infile "):].split()[:2] - elif "outfile " in line and "completed with outfile " not in line and "outfile =" not in line: - # 2025/09/30 18:37:19.74 vine_manager[1849324]vine: tx to d64cepyc028.crc.nd.edu (10.32.94.18:47558): outfile temp-rnd-pidiwheippcwbeu fde2b5eb-9713-423a-8bc6-f4f9263ad20b.pkl 0 3 - [file_key, filename] = line[line.find("outfile ") + len("outfile "):].split()[:2] - else: - continue - if filename in self.filenames_to_ignore: + if line[0] == "#": continue - # NOTE THAT THE FILENAME MAY NOT BE UNIQUE IN TASKVINE WORKFLOWS, SO - # WE ADD THE KEY - self.files_map[file_key] = {"filename": filename + "." + file_key} - filename_to_key_map[filename] = file_key - - # Pass through the transactions file to get the file sizes - with open(self.debug_file) as f: + + if line.startswith("FILE"): + # FILE file-meta-df1e8b0d0e056c4aedb917abe198a2ff "taskvine_poncho.tar.gz" 0 + [ignore, file_key, filename, ignore] = line.split()[:4] + if len(filename) > 1 and filename[0] == "\"" and filename[-1] == "\"": + filename = filename[1:-1] + if self.filenames_to_ignore and any( + ignore_string in filename for ignore_string in self.filenames_to_ignore + ): + continue + if file_key not in self.files_map: + self.files_map[file_key] = {"filename": filename} + + with open(self.transactions_file) as f: for line in f: - if "): file " in line: - [file_key, file_size] = line[line.find("): file ") + len("): file "):].split()[0:2] - else: + if line[0] == "#": continue - if file_key in self.files_map: - self.files_map[file_key]["size"] = int(file_size) - + if "TRANSFER INPUT" in line: + #1769907492293772 245 WORKER worker-8ac3adb4bf1e86b025d0df194b115b8c TRANSFER INPUT file-meta-04276d901d8d096cf981f7ab55f6d1a5 147667979 72409 1769907492221291 + [ignore, ignore, ignore, ignore, ignore, ignore, file_key, file_size_in_mb] = line.split()[0:8] + if file_key not in self.files_map: + continue + self.files_map[file_key]["size"] = int(file_size_in_mb) + elif "TRANSFER OUTPUT" in line: + #1769907502293773 245 WORKER worker-8ac3adb4bf1e86b025d0df194b115b8c TRANSFER OUTPUT file-meta-04276d901d8d096cf981f7ab55f6d1a5 147667979 72409 1769907502221292 + [ignore, ignore, ignore, ignore, ignore, ignore, file_key, file_size_in_mb] = line.split()[0:8] + if file_key not in self.files_map: + continue + self.files_map[file_key]["size"] = int(file_size_in_mb) def _construct_task_runtimes(self) -> None: task_start_times = {} @@ -220,61 +221,47 @@ def _construct_task_runtimes(self) -> None: float(task_end_times[task_index] - task_start_times[task_index]) / 1_000_000.0) def _construct_task_input_output_files(self) -> None: - # Initialize all entries for task_id in self.known_task_ids: self.task_input_files[task_id] = [] self.task_output_files[task_id] = [] with open(self.taskgraph_file) as f: - for line in f: - if "->" not in line: - continue - if "file-task" in line: # Ignoring what I think are taskvine internal/specific things - continue - line = line[:-1] - # print(f"LINE: {line}") - [source, ignore, destination] = line.split() - # Remove quotes - source = source [1:-1] - destination = destination [1:-2] - # Remove weird file- prefix - source = source.replace("--", "-") # Sometimes there is an unexpected "--"!! - destination = destination.replace("--", "-") # Sometimes there is an unexpected "--"!! - # print(f"source: {source} destination: {destination}") - if source.startswith("file-"): - source = source[len("file-"):] - if destination.startswith("file-"): - destination = destination[len("file-"):] - - if "task-" in source and "file-" not in source: - try: - task_id = int(source.split("-")[1]) - except ValueError as e: - raise Exception(f"The source was {source} and the split around '-' failed!") - - if task_id not in self.task_runtimes: + for line in f: + if line.startswith("TASK"): + # TASK T23 "__vine_env_task-rnd-twtxpejwzsyiebf/bin/run_in_env" INPUTS task-rnd-twtxpejwzsyiebf file-meta-d7504c061a7afd9401c612b4ac7d6be6 file-meta-baab4e4516c4d93a8fcdcbba1a680af7 file-meta-693cb61fedd032b4ddec444b8cce6c89 file-rnd-fnrudlxsaqmlpqq OUTPUTS file-rnd-pdtdqayfmmxyxyp + parts = line.split() + task_key = parts[1] + if not task_key.startswith("T"): continue - file_key = destination - if file_key not in self.files_map: - continue - output_file = self.files_map[file_key]["filename"] - self.task_output_files[task_id].append(output_file) - elif "task" in destination and "file" not in destination: try: - task_id = int(destination.split("-")[1]) - except ValueError as e: - raise Exception(f"The destination was {destination} and the split around '-' failed!") - if task_id not in self.task_runtimes: + task_id = int(task_key[1:]) + except ValueError: continue - file_key = source - if file_key not in self.files_map: + if task_id not in self.known_task_ids: continue - input_file = self.files_map[file_key]["filename"] - self.task_input_files[task_id].append(input_file) - else: - raise ValueError("Error in the taskgraph file") - + input_section = False + output_section = False + for part in parts[2:]: + if part == "INPUTS": + input_section = True + output_section = False + continue + elif part == "OUTPUTS": + input_section = False + output_section = True + continue + else: + if input_section: + file_key = part + if file_key not in self.files_map: + continue + self.task_input_files[task_id].append(self.files_map[file_key]["filename"]) + elif output_section: + file_key = part + if file_key not in self.files_map: + continue + self.task_output_files[task_id].append(self.files_map[file_key]["filename"]) def _construct_workflow(self) -> None: # Create files and put them in a map @@ -288,7 +275,6 @@ def _construct_workflow(self) -> None: # Create all tasks task_map = {} - # print(self.task_runtimes[16]) for task_id in self.known_task_ids: task_name = "Task_%d" % task_id task = Task(name=task_name,