Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2025-2026 The WfCommons Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

import pathlib
import shutil
import tarfile
Expand Down
2 changes: 1 addition & 1 deletion tests/translators_loggers/test_translators_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def test_translator(self, backend) -> None:

if parser:
sys.stderr.write(f"\n[{backend}] Parsing the logs...\n")
reconstructed_workflow : Workflow = parser.build_workflow("reconstructed_workflow")
reconstructed_workflow : Workflow = parser.build_workflow(f"reconstructed_workflow_{backend}")
reconstructed_workflow.write_json(pathlib.Path("/tmp/reconstructed_workflow.json"))

original_workflow : Workflow = benchmark.workflow
Expand Down
138 changes: 62 additions & 76 deletions wfcommons/wfinstances/logs/taskvine.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 The WfCommons Team.
# Copyright (c) 2021-2026 The WfCommons Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -138,8 +138,6 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:

# Construct the input and output file for each task
self._construct_task_input_output_files()
# print("TASK INPUT FILES: " + str(self.task_input_files))
# print("TASK OUTPUT FILES: " + str(self.task_output_files))

# Construct the workflow
self._construct_workflow()
Expand All @@ -162,38 +160,41 @@ def _construct_task_command_lines(self) -> None:


def _construct_file_map(self) -> None:

filename_to_key_map = {}
# One pass through the debug file to create the initial file key -> filename mapping
with open(self.debug_file) as f:

with open(self.taskgraph_file) as f:
for line in f:
if "__vine_env_task" in line: # Ignore that weird task/file
continue
if "infile " in line :
# 2025/09/09 21:12:48.02 vine_manager[239]vine: tx to dab178765b01 (127.0.0.1:34382): infile file-rnd-fmtpwpiobiumeze blastall_00000016_outfile_0016 0
[file_key, filename] = line[line.find("infile ") + len("infile "):].split()[:2]
elif "outfile " in line and "completed with outfile " not in line and "outfile =" not in line:
# 2025/09/30 18:37:19.74 vine_manager[1849324]vine: tx to d64cepyc028.crc.nd.edu (10.32.94.18:47558): outfile temp-rnd-pidiwheippcwbeu fde2b5eb-9713-423a-8bc6-f4f9263ad20b.pkl 0 3
[file_key, filename] = line[line.find("outfile ") + len("outfile "):].split()[:2]
else:
continue
if filename in self.filenames_to_ignore:
if line[0] == "#":
continue
# NOTE THAT THE FILENAME MAY NOT BE UNIQUE IN TASKVINE WORKFLOWS, SO
# WE ADD THE KEY
self.files_map[file_key] = {"filename": filename + "." + file_key}
filename_to_key_map[filename] = file_key

# Pass through the transactions file to get the file sizes
with open(self.debug_file) as f:

if line.startswith("FILE"):
# FILE file-meta-df1e8b0d0e056c4aedb917abe198a2ff "taskvine_poncho.tar.gz" 0
[ignore, file_key, filename, ignore] = line.split()[:4]
if len(filename) > 1 and filename[0] == "\"" and filename[-1] == "\"":
filename = filename[1:-1]
if self.filenames_to_ignore and any(
ignore_string in filename for ignore_string in self.filenames_to_ignore
):
continue
if file_key not in self.files_map:
self.files_map[file_key] = {"filename": filename}

with open(self.transactions_file) as f:
for line in f:
if "): file " in line:
[file_key, file_size] = line[line.find("): file ") + len("): file "):].split()[0:2]
else:
if line[0] == "#":
continue
if file_key in self.files_map:
self.files_map[file_key]["size"] = int(file_size)

if "TRANSFER INPUT" in line:
#1769907492293772 245 WORKER worker-8ac3adb4bf1e86b025d0df194b115b8c TRANSFER INPUT file-meta-04276d901d8d096cf981f7ab55f6d1a5 147667979 72409 1769907492221291
[ignore, ignore, ignore, ignore, ignore, ignore, file_key, file_size_in_mb] = line.split()[0:8]
if file_key not in self.files_map:
continue
self.files_map[file_key]["size"] = int(file_size_in_mb)
elif "TRANSFER OUTPUT" in line:
#1769907502293773 245 WORKER worker-8ac3adb4bf1e86b025d0df194b115b8c TRANSFER OUTPUT file-meta-04276d901d8d096cf981f7ab55f6d1a5 147667979 72409 1769907502221292
[ignore, ignore, ignore, ignore, ignore, ignore, file_key, file_size_in_mb] = line.split()[0:8]
if file_key not in self.files_map:
continue
self.files_map[file_key]["size"] = int(file_size_in_mb)

def _construct_task_runtimes(self) -> None:
task_start_times = {}
Expand All @@ -220,61 +221,47 @@ def _construct_task_runtimes(self) -> None:
float(task_end_times[task_index] - task_start_times[task_index]) / 1_000_000.0)

def _construct_task_input_output_files(self) -> None:

# Initialize all entries
for task_id in self.known_task_ids:
self.task_input_files[task_id] = []
self.task_output_files[task_id] = []

with open(self.taskgraph_file) as f:
for line in f:
if "->" not in line:
continue
if "file-task" in line: # Ignoring what I think are taskvine internal/specific things
continue
line = line[:-1]
# print(f"LINE: {line}")
[source, ignore, destination] = line.split()
# Remove quotes
source = source [1:-1]
destination = destination [1:-2]
# Remove weird file- prefix
source = source.replace("--", "-") # Sometimes there is an unexpected "--"!!
destination = destination.replace("--", "-") # Sometimes there is an unexpected "--"!!
# print(f"source: {source} destination: {destination}")
if source.startswith("file-"):
source = source[len("file-"):]
if destination.startswith("file-"):
destination = destination[len("file-"):]

if "task-" in source and "file-" not in source:
try:
task_id = int(source.split("-")[1])
except ValueError as e:
raise Exception(f"The source was {source} and the split around '-' failed!")

if task_id not in self.task_runtimes:
for line in f:
if line.startswith("TASK"):
# TASK T23 "__vine_env_task-rnd-twtxpejwzsyiebf/bin/run_in_env" INPUTS task-rnd-twtxpejwzsyiebf file-meta-d7504c061a7afd9401c612b4ac7d6be6 file-meta-baab4e4516c4d93a8fcdcbba1a680af7 file-meta-693cb61fedd032b4ddec444b8cce6c89 file-rnd-fnrudlxsaqmlpqq OUTPUTS file-rnd-pdtdqayfmmxyxyp
parts = line.split()
task_key = parts[1]
if not task_key.startswith("T"):
continue
file_key = destination
if file_key not in self.files_map:
continue
output_file = self.files_map[file_key]["filename"]
self.task_output_files[task_id].append(output_file)
elif "task" in destination and "file" not in destination:
try:
task_id = int(destination.split("-")[1])
except ValueError as e:
raise Exception(f"The destination was {destination} and the split around '-' failed!")
if task_id not in self.task_runtimes:
task_id = int(task_key[1:])
except ValueError:
continue
file_key = source
if file_key not in self.files_map:
if task_id not in self.known_task_ids:
continue
input_file = self.files_map[file_key]["filename"]
self.task_input_files[task_id].append(input_file)
else:
raise ValueError("Error in the taskgraph file")

input_section = False
output_section = False
for part in parts[2:]:
if part == "INPUTS":
input_section = True
output_section = False
continue
elif part == "OUTPUTS":
input_section = False
output_section = True
continue
else:
if input_section:
file_key = part
if file_key not in self.files_map:
continue
self.task_input_files[task_id].append(self.files_map[file_key]["filename"])
elif output_section:
file_key = part
if file_key not in self.files_map:
continue
self.task_output_files[task_id].append(self.files_map[file_key]["filename"])

def _construct_workflow(self) -> None:
# Create files and put them in a map
Expand All @@ -288,7 +275,6 @@ def _construct_workflow(self) -> None:

# Create all tasks
task_map = {}
# print(self.task_runtimes[16])
for task_id in self.known_task_ids:
task_name = "Task_%d" % task_id
task = Task(name=task_name,
Expand Down