Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 37 additions & 35 deletions spectragraph-core/src/spectragraph_core/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
self.transform_branches = transform_branches
self.transforms = {} # Map of nodeId -> transform instance
self.execution_log_file = None # Path to the execution log file
self.execution_log_data=None
self._create_execution_log()
self._load_transforms()

Expand Down Expand Up @@ -74,6 +75,7 @@ def _create_execution_log(self) -> None:
total_steps += 1
initial_log["summary"]["total_steps"] = total_steps

self.execution_log_data = initial_log
# Save initial log file
with open(self.execution_log_file, "w", encoding="utf-8") as f:
json.dump(initial_log, f, indent=2, ensure_ascii=False)
Expand All @@ -90,72 +92,72 @@ def _create_execution_log(self) -> None:
self.sketch_id, {"message": f"Failed to create execution log: {str(e)}"}
)
self.execution_log_file = None

def _write_log_to_disk(self) -> None:
# This Helper method to write the current in-memory log to disk.Intended to be run in a separate thread.

if not self.execution_log_file or not self.execution_log_data:
return

try:
with open(self.execution_log_file, "w", encoding="utf-8") as f:
json.dump(self.execution_log_data, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error writing execution log to disk: {e}")

def _update_execution_log(
async def _update_execution_log(
self, step_entry: Dict[str, Any], status: str = None
) -> None:
"""
Update the execution log file with a new step entry or status update.
Uses asyncio.to_thread for non-blocking I/O.
"""
if not self.execution_log_file:
if not self.execution_log_file or not self.execution_log_data:
return

try:
# Read current log
with open(self.execution_log_file, "r", encoding="utf-8") as f:
log_data = json.load(f)

# Update in-memory data
log_data = self.execution_log_data

# Update timestamp
log_data["updated_at"] = datetime.now().isoformat()

# Update status if provided
if status:
log_data["status"] = status

# Add step entry if provided
if step_entry:
log_data["execution_log"].append(step_entry)

# Update summary
if step_entry["status"] == "completed":
log_data["summary"]["completed_steps"] += 1
elif step_entry["status"] == "error":
log_data["summary"]["failed_steps"] += 1

if "execution_time_ms" in step_entry:
log_data["summary"]["total_execution_time_ms"] += step_entry[
"execution_time_ms"
]

# Write updated log
with open(self.execution_log_file, "w", encoding="utf-8") as f:
json.dump(log_data, f, indent=2, ensure_ascii=False)

# Write updated log to disk asynchronously
await asyncio.to_thread(self._write_log_to_disk)
except Exception as e:
Logger.error(
self.sketch_id, {"message": f"Failed to update execution log: {str(e)}"}
)

def _finalize_execution_log(self, final_results: Dict[str, Any]) -> None:
"""
Finalize the execution log with final results and status.
"""
if not self.execution_log_file:
async def _finalize_execution_log(self, final_results: Dict[str, Any]) -> None:
#Finalize the execution log with final results and status.

if not self.execution_log_file or not self.execution_log_data:
return

try:
# Read current log
with open(self.execution_log_file, "r", encoding="utf-8") as f:
log_data = json.load(f)

# Update in-memory data
log_data = self.execution_log_data

# Update final data
log_data["updated_at"] = datetime.now().isoformat()
log_data["status"] = "completed"
log_data["final_results"] = to_json_serializable(final_results)

# Write final log
with open(self.execution_log_file, "w", encoding="utf-8") as f:
json.dump(log_data, f, indent=2, ensure_ascii=False)
# Write final log asynchronously
await asyncio.to_thread(self._write_log_to_disk)

Logger.info(
self.sketch_id,
Expand Down Expand Up @@ -348,7 +350,7 @@ async def _async_scan(self, values: List[str]) -> Dict[str, Any]:
The actual async implementation of the scan logic
"""
# Update execution log to indicate scan has started
self._update_execution_log(None, "running")
await self._update_execution_log(None, "running")

results = {"initial_values": values, "branches": [], "results": {}}
Logger.pending(self.sketch_id, {"message": "Starting transform..."})
Expand Down Expand Up @@ -418,7 +420,7 @@ async def _async_scan(self, values: List[str]) -> Dict[str, Any]:
log_entry["execution_time_ms"] = int(
(time.time() - step_start_time) * 1000
)
self._update_execution_log(log_entry)
await self._update_execution_log(log_entry)
branch_results["steps"].append(step_result)
continue

Expand Down Expand Up @@ -465,7 +467,7 @@ async def _async_scan(self, values: List[str]) -> Dict[str, Any]:
(time.time() - step_start_time) * 1000
)
results["results"][node_id] = {"error": error_msg}
self._update_execution_log(log_entry)
await self._update_execution_log(log_entry)
return results

except Exception as e:
Expand All @@ -478,11 +480,11 @@ async def _async_scan(self, values: List[str]) -> Dict[str, Any]:
(time.time() - step_start_time) * 1000
)
results["results"][node_id] = {"error": error_msg}
self._update_execution_log(log_entry)
await self._update_execution_log(log_entry)
return results

# Update execution log with this step
self._update_execution_log(log_entry)
await self._update_execution_log(log_entry)
branch_results["steps"].append(step_result)

results["branches"].append(branch_results)
Expand All @@ -495,6 +497,6 @@ async def _async_scan(self, values: List[str]) -> Dict[str, Any]:
results["reference_mapping"] = results_mapping

# Finalize execution log
self._finalize_execution_log(results)
await self._finalize_execution_log(results)

return results