diff --git a/app.py b/app.py index e2d60b0..58a9c02 100644 --- a/app.py +++ b/app.py @@ -116,6 +116,24 @@ def handle_validation_error(e): active_processes = {} active_processes_lock = threading.Lock() +file_lock = threading.Lock() + +def atomic_json_write(path, data): + with file_lock: + dir_name = os.path.dirname(path) or "." + + with tempfile.NamedTemporaryFile( + mode='w', + delete=False, + dir=dir_name, + encoding='utf-8' + ) as tmp_file: + + json.dump(data, tmp_file, indent=2) + temp_name = tmp_file.name + + os.replace(temp_name, path) + def validate_workspace_snapshot(data): if not isinstance(data, dict): @@ -167,8 +185,7 @@ def save_workspace_state(data): } try: - with open(WORKSPACE_STATE_FILE, "w", encoding="utf-8") as f: - json.dump(payload, f, indent=2) + atomic_json_write(WORKSPACE_STATE_FILE, payload) return True, None except Exception as e: return False, str(e) @@ -622,10 +639,7 @@ def save_command_history(command): # Keep latest 200 history = history[:200] - - with open(COMMAND_HISTORY_FILE, "w", encoding="utf-8") as f: - json.dump(history, f, indent=2) - + atomic_json_write(COMMAND_HISTORY_FILE, history) def _load_history_entries(query="", status="all", kind="all", limit=200): entries = _read_jsonl(HISTORY_FILE) @@ -2447,12 +2461,12 @@ def get_command_history(): def clear_command_history(): try: # Overwrite the history JSON file with an empty array - with open(COMMAND_HISTORY_FILE, "w", encoding="utf-8") as f: - json.dump([], f, indent=2) - - return jsonify( - {"success": True, "message": "Command history cleared successfully"} - ) + atomic_json_write(COMMAND_HISTORY_FILE, []) + + return jsonify({ + 'success': True, + 'message': 'Command history cleared successfully' + }) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @@ -2800,9 +2814,8 @@ def save_workspace_profile(): } try: - with open(profile_path, "w", encoding="utf-8") as f: - json.dump(payload, f, indent=2) - return jsonify({"success": True}) + atomic_json_write(profile_path, payload) + return jsonify({'success': True}) except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @@ -3231,6 +3244,14 @@ def run_script(): run_id = str(uuid.uuid4())[:8] shell_cmd = _find_shell() + execution = _start_execution_record( + kind='script', + display_name=rel_path, + command_text=f'{shell_cmd} {full_path}', + shell_cmd=shell_cmd, + cwd=SCRIPTS_DIR, + ) + def generate(): proc = None run_path = full_path @@ -3258,6 +3279,7 @@ def generate(): if steps: temp_dir = os.path.dirname(full_path) + temp_fd, temp_path = tempfile.mkstemp( suffix=".sh", prefix=".tmp_run_", dir=temp_dir ) @@ -3267,6 +3289,7 @@ def generate(): temp_f.write(instrumented_content) run_path = temp_path + else: run_path = full_path @@ -3304,6 +3327,32 @@ def generate(): "stop_event": stop_event, } + metrics = {'cpu': 0.0, 'mem': 0.0} + + t_metrics = threading.Thread( + target=_track_metrics, + args=(proc, metrics) + ) + + t_metrics.start() + + _append_execution_line( + execution, + 'system', + f'Starting script execution... (ID: {run_id})' + ) + + payload = { + 'type': 'started', + 'run_id': run_id, + 'content': f'Starting script execution... (ID: {run_id})\n' + } + + yield f"data: {json.dumps(payload)}\n\n" + + for line in iter(proc.stdout.readline, ''): + if line: + metrics = {"cpu": 0.0, "mem": 0.0} t_metrics = threading.Thread( target=_track_metrics, args=(proc, metrics, stop_event) @@ -3348,8 +3397,12 @@ def stream_reader(stream, q): if run_path != full_path: temp_basename = os.path.basename(run_path) orig_basename = os.path.basename(full_path) + if temp_basename in line: - line = line.replace(temp_basename, orig_basename) + line = line.replace( + temp_basename, + orig_basename + ) if "::progress::" in line: match = re.search(r"::progress::(\d+)::(\d+)::(.*)", line) @@ -3357,18 +3410,49 @@ def stream_reader(stream, q): step_idx = int(match.group(1)) total_steps = int(match.group(2)) cmd_text = match.group(3).strip() - yield "data: " + json.dumps( - { - "type": "progress", - "step": step_idx, - "total": total_steps, - "command": cmd_text, - } - ) + "\n\n" + + payload = { + 'type': 'progress', + 'step': step_idx, + 'total': total_steps, + 'command': cmd_text + } + + yield f"data: {json.dumps(payload)}\n\n" continue - # Heuristic to detect errors in the combined stream l_lower = line.lower() + msg_type = 'stdout' + + if any( + err in l_lower + for err in [ + 'error:', + 'failed:', + 'not found', + 'denied', + 'no such file' + ] + ): + msg_type = 'error' + + _append_execution_line( + execution, + msg_type, + line + ) + + payload = { + 'type': msg_type, + 'content': line + } + + yield f"data: {json.dumps(payload)}\n\n" + + proc.stdout.close() + proc.wait(timeout=10) + + t_metrics.join(timeout=1) msg_type = "stdout" if any( err in l_lower @@ -3447,6 +3531,7 @@ def stream_reader(stream, q): elapsed = end_time - start_time was_aborted = False + with active_processes_lock: entry = active_processes.get(run_id) if entry and entry.get("aborted"): @@ -3459,12 +3544,23 @@ def stream_reader(stream, q): _finalize_execution( execution, success=False, - exit_code=proc.returncode if proc.returncode is not None else -15, + exit_code=( + proc.returncode + if proc.returncode is not None + else -15 + ), duration_seconds=elapsed, error_message="Script aborted by user", ) - abort_msg = 'Script aborted\n' - yield f"data: {json.dumps({'type': 'aborted', 'run_id': run_id, 'content': abort_msg})}\n\n" + + payload = { + 'type': 'aborted', + 'run_id': run_id, + 'content': 'Script aborted\n' + } + + yield f"data: {json.dumps(payload)}\n\n" + else: system_mem = psutil.virtual_memory().total / (1024 * 1024) mem_percent = ( @@ -3493,6 +3589,94 @@ def stream_reader(stream, q): duration_seconds=elapsed, resource_usage=resource_info, ) + + payload = { + 'type': 'metrics', + 'resources': resource_info, + 'exit_code': proc.returncode, + 'success': proc.returncode == 0 + } + + yield f"data: {json.dumps(payload)}\n\n" + + except subprocess.TimeoutExpired: + + was_aborted = False + + with active_processes_lock: + entry = active_processes.get(run_id) + + if entry and entry.get('aborted'): + was_aborted = True + + if proc: + _terminate_process_tree(proc) + + if was_aborted: + + _append_execution_line( + execution, + 'system', + f'Script aborted (exit code {proc.returncode})' + ) + + _finalize_execution( + execution, + success=False, + exit_code=( + proc.returncode + if proc and proc.returncode is not None + else -15 + ), + duration_seconds=time.time() - start_time, + error_message='Script aborted by user', + ) + + payload = { + 'type': 'aborted', + 'run_id': run_id, + 'content': 'Script aborted\n' + } + + yield f"data: {json.dumps(payload)}\n\n" + + else: + + _append_execution_line( + execution, + 'error', + '❌ Execution timed out' + ) + + _finalize_execution( + execution, + success=False, + exit_code=-1, + duration_seconds=time.time() - start_time, + error_message='Process timed out', + ) + + payload = { + 'type': 'error', + 'content': '❌ Execution timed out\n' + } + + yield f"data: {json.dumps(payload)}\n\n" + + except Exception as e: + + _append_execution_line( + execution, + 'error', + f'❌ Execution Error: {str(e)}' + ) + + if proc is not None and getattr(proc, 'returncode', None) is not None: + exit_code = proc.returncode + else: + exit_code = -1 + + _finalize_execution( yield "data: " + json.dumps( { "type": "metrics", @@ -3547,6 +3731,26 @@ def stream_reader(stream, q): stop_event=stop_event, reader_thread=t_reader, ) + + payload = { + 'type': 'error', + 'content': f'❌ Execution Error: {str(e)}' + } + + yield f"data: {json.dumps(payload)}\n\n" + + finally: + + if 'run_path' in locals() and run_path != full_path: + try: + if os.path.exists(run_path): + os.remove(run_path) + except Exception: + pass + + with active_processes_lock: + if run_id in active_processes: + del active_processes[run_id] yield "data: " + json.dumps( {"type": "error", "content": f"❌ Execution Error: {str(e)}"} ) + "\n\n" @@ -3789,6 +3993,8 @@ def stream_reader(stream, q): error_message=str(e), reader_thread=t_reader, ) + yield f"data: {json.dumps({'type': 'error', 'content': f'❌ Command Error: {str(e)}'})}\n\n" + return Response(generate(), mimetype='text/event-stream') yield "data: " + json.dumps( {"type": "error", "content": f"❌ Command Error: {str(e)}"} ) + "\n\n"