From ec824d7ef940253181e2cdb81b1243f122cf1365 Mon Sep 17 00:00:00 2001 From: Akshit Sahore Date: Fri, 9 Jan 2026 14:57:18 +0530 Subject: [PATCH] fix : optimize orchestrator logging with async non-blocking I/O .Replaces blocking synchronous file writes with in-memory state and background disk updates --- .../spectragraph_core/core/orchestrator.py | 72 ++++++++++--------- 1 file changed, 37 insertions(+), 35 deletions(-) diff --git a/spectragraph-core/src/spectragraph_core/core/orchestrator.py b/spectragraph-core/src/spectragraph_core/core/orchestrator.py index 4797e0a..67113b7 100644 --- a/spectragraph-core/src/spectragraph_core/core/orchestrator.py +++ b/spectragraph-core/src/spectragraph_core/core/orchestrator.py @@ -29,6 +29,7 @@ def __init__( self.transform_branches = transform_branches self.transforms = {} # Map of nodeId -> transform instance self.execution_log_file = None # Path to the execution log file + self.execution_log_data=None self._create_execution_log() self._load_transforms() @@ -74,6 +75,7 @@ def _create_execution_log(self) -> None: total_steps += 1 initial_log["summary"]["total_steps"] = total_steps + self.execution_log_data = initial_log # Save initial log file with open(self.execution_log_file, "w", encoding="utf-8") as f: json.dump(initial_log, f, indent=2, ensure_ascii=False) @@ -90,72 +92,72 @@ def _create_execution_log(self) -> None: self.sketch_id, {"message": f"Failed to create execution log: {str(e)}"} ) self.execution_log_file = None + + def _write_log_to_disk(self) -> None: + # This Helper method to write the current in-memory log to disk.Intended to be run in a separate thread. + + if not self.execution_log_file or not self.execution_log_data: + return + + try: + with open(self.execution_log_file, "w", encoding="utf-8") as f: + json.dump(self.execution_log_data, f, indent=2, ensure_ascii=False) + except Exception as e: + print(f"Error writing execution log to disk: {e}") - def _update_execution_log( + async def _update_execution_log( self, step_entry: Dict[str, Any], status: str = None ) -> None: """ Update the execution log file with a new step entry or status update. + Uses asyncio.to_thread for non-blocking I/O. """ - if not self.execution_log_file: + if not self.execution_log_file or not self.execution_log_data: return - try: - # Read current log - with open(self.execution_log_file, "r", encoding="utf-8") as f: - log_data = json.load(f) - + # Update in-memory data + log_data = self.execution_log_data + # Update timestamp log_data["updated_at"] = datetime.now().isoformat() - # Update status if provided if status: log_data["status"] = status - # Add step entry if provided if step_entry: log_data["execution_log"].append(step_entry) - # Update summary if step_entry["status"] == "completed": log_data["summary"]["completed_steps"] += 1 elif step_entry["status"] == "error": log_data["summary"]["failed_steps"] += 1 - if "execution_time_ms" in step_entry: log_data["summary"]["total_execution_time_ms"] += step_entry[ "execution_time_ms" ] - - # Write updated log - with open(self.execution_log_file, "w", encoding="utf-8") as f: - json.dump(log_data, f, indent=2, ensure_ascii=False) - + # Write updated log to disk asynchronously + await asyncio.to_thread(self._write_log_to_disk) except Exception as e: Logger.error( self.sketch_id, {"message": f"Failed to update execution log: {str(e)}"} ) - def _finalize_execution_log(self, final_results: Dict[str, Any]) -> None: - """ - Finalize the execution log with final results and status. - """ - if not self.execution_log_file: + async def _finalize_execution_log(self, final_results: Dict[str, Any]) -> None: + #Finalize the execution log with final results and status. + + if not self.execution_log_file or not self.execution_log_data: return - try: - # Read current log - with open(self.execution_log_file, "r", encoding="utf-8") as f: - log_data = json.load(f) - + # Update in-memory data + log_data = self.execution_log_data + # Update final data log_data["updated_at"] = datetime.now().isoformat() log_data["status"] = "completed" log_data["final_results"] = to_json_serializable(final_results) - # Write final log - with open(self.execution_log_file, "w", encoding="utf-8") as f: - json.dump(log_data, f, indent=2, ensure_ascii=False) + # Write final log asynchronously + await asyncio.to_thread(self._write_log_to_disk) Logger.info( self.sketch_id, @@ -348,7 +350,7 @@ async def _async_scan(self, values: List[str]) -> Dict[str, Any]: The actual async implementation of the scan logic """ # Update execution log to indicate scan has started - self._update_execution_log(None, "running") + await self._update_execution_log(None, "running") results = {"initial_values": values, "branches": [], "results": {}} Logger.pending(self.sketch_id, {"message": "Starting transform..."}) @@ -418,7 +420,7 @@ async def _async_scan(self, values: List[str]) -> Dict[str, Any]: log_entry["execution_time_ms"] = int( (time.time() - step_start_time) * 1000 ) - self._update_execution_log(log_entry) + await self._update_execution_log(log_entry) branch_results["steps"].append(step_result) continue @@ -465,7 +467,7 @@ async def _async_scan(self, values: List[str]) -> Dict[str, Any]: (time.time() - step_start_time) * 1000 ) results["results"][node_id] = {"error": error_msg} - self._update_execution_log(log_entry) + await self._update_execution_log(log_entry) return results except Exception as e: @@ -478,11 +480,11 @@ async def _async_scan(self, values: List[str]) -> Dict[str, Any]: (time.time() - step_start_time) * 1000 ) results["results"][node_id] = {"error": error_msg} - self._update_execution_log(log_entry) + await self._update_execution_log(log_entry) return results # Update execution log with this step - self._update_execution_log(log_entry) + await self._update_execution_log(log_entry) branch_results["steps"].append(step_result) results["branches"].append(branch_results) @@ -495,6 +497,6 @@ async def _async_scan(self, values: List[str]) -> Dict[str, Any]: results["reference_mapping"] = results_mapping # Finalize execution log - self._finalize_execution_log(results) + await self._finalize_execution_log(results) return results