Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 234 additions & 28 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ def handle_validation_error(e):
active_processes = {}
active_processes_lock = threading.Lock()

file_lock = threading.Lock()

def atomic_json_write(path, data):
with file_lock:
dir_name = os.path.dirname(path) or "."

with tempfile.NamedTemporaryFile(
mode='w',
delete=False,
dir=dir_name,
encoding='utf-8'
) as tmp_file:

json.dump(data, tmp_file, indent=2)
temp_name = tmp_file.name

os.replace(temp_name, path)


def validate_workspace_snapshot(data):
if not isinstance(data, dict):
Expand Down Expand Up @@ -167,8 +185,7 @@ def save_workspace_state(data):
}

try:
with open(WORKSPACE_STATE_FILE, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2)
atomic_json_write(WORKSPACE_STATE_FILE, payload)
return True, None
except Exception as e:
return False, str(e)
Expand Down Expand Up @@ -622,10 +639,7 @@ def save_command_history(command):

# Keep latest 200
history = history[:200]

with open(COMMAND_HISTORY_FILE, "w", encoding="utf-8") as f:
json.dump(history, f, indent=2)

atomic_json_write(COMMAND_HISTORY_FILE, history)

def _load_history_entries(query="", status="all", kind="all", limit=200):
entries = _read_jsonl(HISTORY_FILE)
Expand Down Expand Up @@ -2447,12 +2461,12 @@ def get_command_history():
def clear_command_history():
try:
# Overwrite the history JSON file with an empty array
with open(COMMAND_HISTORY_FILE, "w", encoding="utf-8") as f:
json.dump([], f, indent=2)

return jsonify(
{"success": True, "message": "Command history cleared successfully"}
)
atomic_json_write(COMMAND_HISTORY_FILE, [])

return jsonify({
'success': True,
'message': 'Command history cleared successfully'
})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500

Expand Down Expand Up @@ -2800,9 +2814,8 @@ def save_workspace_profile():
}

try:
with open(profile_path, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2)
return jsonify({"success": True})
atomic_json_write(profile_path, payload)
return jsonify({'success': True})
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500

Expand Down Expand Up @@ -3231,6 +3244,14 @@ def run_script():
run_id = str(uuid.uuid4())[:8]
shell_cmd = _find_shell()

execution = _start_execution_record(
kind='script',
display_name=rel_path,
command_text=f'{shell_cmd} {full_path}',
shell_cmd=shell_cmd,
cwd=SCRIPTS_DIR,
)

def generate():
proc = None
run_path = full_path
Expand Down Expand Up @@ -3258,6 +3279,7 @@ def generate():

if steps:
temp_dir = os.path.dirname(full_path)

temp_fd, temp_path = tempfile.mkstemp(
suffix=".sh", prefix=".tmp_run_", dir=temp_dir
)
Expand All @@ -3267,6 +3289,7 @@ def generate():
temp_f.write(instrumented_content)

run_path = temp_path

else:
run_path = full_path

Expand Down Expand Up @@ -3304,6 +3327,32 @@ def generate():
"stop_event": stop_event,
}

metrics = {'cpu': 0.0, 'mem': 0.0}

t_metrics = threading.Thread(
target=_track_metrics,
args=(proc, metrics)
)

t_metrics.start()

_append_execution_line(
execution,
'system',
f'Starting script execution... (ID: {run_id})'
)

payload = {
'type': 'started',
'run_id': run_id,
'content': f'Starting script execution... (ID: {run_id})\n'
}

yield f"data: {json.dumps(payload)}\n\n"

for line in iter(proc.stdout.readline, ''):
if line:

metrics = {"cpu": 0.0, "mem": 0.0}
t_metrics = threading.Thread(
target=_track_metrics, args=(proc, metrics, stop_event)
Expand Down Expand Up @@ -3348,27 +3397,62 @@ def stream_reader(stream, q):
if run_path != full_path:
temp_basename = os.path.basename(run_path)
orig_basename = os.path.basename(full_path)

if temp_basename in line:
line = line.replace(temp_basename, orig_basename)
line = line.replace(
temp_basename,
orig_basename
)

if "::progress::" in line:
match = re.search(r"::progress::(\d+)::(\d+)::(.*)", line)
if match:
step_idx = int(match.group(1))
total_steps = int(match.group(2))
cmd_text = match.group(3).strip()
yield "data: " + json.dumps(
{
"type": "progress",
"step": step_idx,
"total": total_steps,
"command": cmd_text,
}
) + "\n\n"

payload = {
'type': 'progress',
'step': step_idx,
'total': total_steps,
'command': cmd_text
}

yield f"data: {json.dumps(payload)}\n\n"
continue

# Heuristic to detect errors in the combined stream
l_lower = line.lower()
msg_type = 'stdout'

if any(
err in l_lower
for err in [
'error:',
'failed:',
'not found',
'denied',
'no such file'
]
):
msg_type = 'error'

_append_execution_line(
execution,
msg_type,
line
)

payload = {
'type': msg_type,
'content': line
}

yield f"data: {json.dumps(payload)}\n\n"

proc.stdout.close()
proc.wait(timeout=10)

t_metrics.join(timeout=1)
msg_type = "stdout"
if any(
err in l_lower
Expand Down Expand Up @@ -3447,6 +3531,7 @@ def stream_reader(stream, q):
elapsed = end_time - start_time

was_aborted = False

with active_processes_lock:
entry = active_processes.get(run_id)
if entry and entry.get("aborted"):
Expand All @@ -3459,12 +3544,23 @@ def stream_reader(stream, q):
_finalize_execution(
execution,
success=False,
exit_code=proc.returncode if proc.returncode is not None else -15,
exit_code=(
proc.returncode
if proc.returncode is not None
else -15
),
duration_seconds=elapsed,
error_message="Script aborted by user",
)
abort_msg = 'Script aborted\n'
yield f"data: {json.dumps({'type': 'aborted', 'run_id': run_id, 'content': abort_msg})}\n\n"

payload = {
'type': 'aborted',
'run_id': run_id,
'content': 'Script aborted\n'
}

yield f"data: {json.dumps(payload)}\n\n"

else:
system_mem = psutil.virtual_memory().total / (1024 * 1024)
mem_percent = (
Expand Down Expand Up @@ -3493,6 +3589,94 @@ def stream_reader(stream, q):
duration_seconds=elapsed,
resource_usage=resource_info,
)

payload = {
'type': 'metrics',
'resources': resource_info,
'exit_code': proc.returncode,
'success': proc.returncode == 0
}

yield f"data: {json.dumps(payload)}\n\n"

except subprocess.TimeoutExpired:

was_aborted = False

with active_processes_lock:
entry = active_processes.get(run_id)

if entry and entry.get('aborted'):
was_aborted = True

if proc:
_terminate_process_tree(proc)

if was_aborted:

_append_execution_line(
execution,
'system',
f'Script aborted (exit code {proc.returncode})'
)

_finalize_execution(
execution,
success=False,
exit_code=(
proc.returncode
if proc and proc.returncode is not None
else -15
),
duration_seconds=time.time() - start_time,
error_message='Script aborted by user',
)

payload = {
'type': 'aborted',
'run_id': run_id,
'content': 'Script aborted\n'
}

yield f"data: {json.dumps(payload)}\n\n"

else:

_append_execution_line(
execution,
'error',
'❌ Execution timed out'
)

_finalize_execution(
execution,
success=False,
exit_code=-1,
duration_seconds=time.time() - start_time,
error_message='Process timed out',
)

payload = {
'type': 'error',
'content': '❌ Execution timed out\n'
}

yield f"data: {json.dumps(payload)}\n\n"

except Exception as e:

_append_execution_line(
execution,
'error',
f'❌ Execution Error: {str(e)}'
)

if proc is not None and getattr(proc, 'returncode', None) is not None:
exit_code = proc.returncode
else:
exit_code = -1

_finalize_execution(
yield "data: " + json.dumps(
{
"type": "metrics",
Expand Down Expand Up @@ -3547,6 +3731,26 @@ def stream_reader(stream, q):
stop_event=stop_event,
reader_thread=t_reader,
)

payload = {
'type': 'error',
'content': f'❌ Execution Error: {str(e)}'
}

yield f"data: {json.dumps(payload)}\n\n"

finally:

if 'run_path' in locals() and run_path != full_path:
try:
if os.path.exists(run_path):
os.remove(run_path)
except Exception:
pass

with active_processes_lock:
if run_id in active_processes:
del active_processes[run_id]
yield "data: " + json.dumps(
{"type": "error", "content": f"❌ Execution Error: {str(e)}"}
) + "\n\n"
Expand Down Expand Up @@ -3789,6 +3993,8 @@ def stream_reader(stream, q):
error_message=str(e),
reader_thread=t_reader,
)
yield f"data: {json.dumps({'type': 'error', 'content': f'❌ Command Error: {str(e)}'})}\n\n"
return Response(generate(), mimetype='text/event-stream')
yield "data: " + json.dumps(
{"type": "error", "content": f"❌ Command Error: {str(e)}"}
) + "\n\n"
Expand Down