diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index 211cda2..3a4975c 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -58,11 +58,8 @@ body: description: Confirm the following before submitting. options: - label: I have read the contributing guidelines - value: read_contributing - label: I have checked for duplicate issues - value: checked_duplicates - label: I would like to work on a pull request to fix this - value: will_work_on_pr - type: textarea id: additional diff --git a/.github/ISSUE_TEMPLATE/docs_report.yml b/.github/ISSUE_TEMPLATE/docs_report.yml index c70a0b6..4482d7b 100644 --- a/.github/ISSUE_TEMPLATE/docs_report.yml +++ b/.github/ISSUE_TEMPLATE/docs_report.yml @@ -44,11 +44,8 @@ body: description: Confirm the following before submitting. options: - label: I have read the contributing guidelines - value: read_contributing - label: I have checked for duplicate issues - value: checked_duplicates - label: I would like to work on a pull request to fix this - value: will_work_on_pr - type: textarea id: additional diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml index 81ecdd2..42d2108 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.yml +++ b/.github/ISSUE_TEMPLATE/feature_request.yml @@ -60,11 +60,8 @@ body: description: Confirm the following before submitting. options: - label: I have read the contributing guidelines - value: read_contributing - label: I have checked for duplicate issues - value: checked_duplicates - label: I would like to work on a pull request to implement this - value: will_work_on_pr - type: textarea id: additional diff --git a/.github/ISSUE_TEMPLATE/testing.yml b/.github/ISSUE_TEMPLATE/testing.yml index 3584c70..ce9f374 100644 --- a/.github/ISSUE_TEMPLATE/testing.yml +++ b/.github/ISSUE_TEMPLATE/testing.yml @@ -57,11 +57,8 @@ body: description: Confirm the following before submitting. options: - label: I have read the contributing guidelines - value: read_contributing - label: I have checked for duplicate issues - value: checked_duplicates - label: I would like to work on a pull request to fix this - value: will_work_on_pr - type: textarea id: additional diff --git a/tests/test_app_helpers.py b/tests/test_app_helpers.py new file mode 100644 index 0000000..e07a88f --- /dev/null +++ b/tests/test_app_helpers.py @@ -0,0 +1,139 @@ +import os +import json +import time +import shutil +import tempfile +from unittest.mock import patch + +import pytest + + +def make_history_entry(id, display_name, success, duration, finished_at=None): + return { + "id": id, + "display_name": display_name, + "kind": "script", + "success": success, + "failure_type": None if success else "shell_error", + "duration_seconds": duration, + "finished_at": finished_at or "2026-06-03T00:00:00Z", + } + + +def test_trim_jsonl_trims_file(app_module, tmp_path): + f = tmp_path / "j.jsonl" + with open(f, "w", encoding="utf-8") as fh: + for i in range(5): + fh.write(json.dumps({"i": i}) + "\n") + + app_module._trim_jsonl(str(f), 3) + lines = open(f, "r", encoding="utf-8").read().strip().splitlines() + assert len(lines) == 3 + + +def test_parse_execution_log_metadata_reads_headers(app_module, tmp_path): + exec_dir = tmp_path / "exec" + exec_dir.mkdir() + log = exec_dir / "20260603_script_test.log" + with open(log, "w", encoding="utf-8") as fh: + fh.write("[2026-06-03T00:00:00Z] execution started\n") + fh.write("kind: script\n") + fh.write("id: abc123\n") + fh.write("display: myscript\n") + fh.write("exit_code: 2\n") + fh.write("status: failed\n") + + with patch.object(app_module, "EXECUTION_LOG_DIR", str(exec_dir)): + meta = app_module._parse_execution_log_metadata(os.path.basename(str(log))) + assert meta is not None + assert meta.get("status") == "failed" + assert meta.get("exit_code") == 2 + + +def test_start_and_append_excerpt_trimming(app_module, tmp_path): + exec_dir = tmp_path / "exec" + sess_dir = tmp_path / "sessions" + exec_dir.mkdir() + sess_dir.mkdir() + + with patch.object(app_module, "EXECUTION_LOG_DIR", str(exec_dir)), patch.object(app_module, "SESSION_LOG_DIR", str(sess_dir)), patch.object(app_module, "MAX_HISTORY_EXCERPT_CHARS", 50): + execution = app_module._start_execution_record(kind="script", display_name="long", command_text="echo") + for i in range(10): + app_module._append_execution_line(execution, "stdout", "line-with-data-" + str(i) + "\n") + assert execution["excerpt_size"] <= 50 + + +def test_record_reliability_and_rebuild_summary(app_module, tmp_path): + hist_file = tmp_path / "history.jsonl" + failed_file = tmp_path / "failed.jsonl" + rel_dir = tmp_path / "reliability" + rel_dir.mkdir() + session_dir = tmp_path / "sessions" + exec_dir = tmp_path / "exec" + session_dir.mkdir() + exec_dir.mkdir() + + entries = [make_history_entry(f"id{i}", "myscript", i % 2 == 0, i * 0.5) for i in range(1, 6)] + with open(hist_file, "w", encoding="utf-8") as fh: + for e in entries: + fh.write(json.dumps(e) + "\n") + + with patch.object(app_module, "HISTORY_FILE", str(hist_file)), patch.object(app_module, "FAILED_HISTORY_FILE", str(failed_file)), patch.object(app_module, "RELIABILITY_DIR", str(rel_dir)), patch.object(app_module, "RELIABILITY_EVENTS_FILE", str(rel_dir / "events.jsonl")), patch.object(app_module, "RELIABILITY_SUMMARY_FILE", str(rel_dir / "summary.json")), patch.object(app_module, "SESSION_LOG_DIR", str(session_dir)), patch.object(app_module, "EXECUTION_LOG_DIR", str(exec_dir)): + summary = app_module._rebuild_reliability_summary() + assert isinstance(summary, dict) + assert "scripts" in summary + total_runs = summary.get("global", {}).get("total_runs") + assert total_runs == len(entries) + + +def test_append_and_read_jsonl(app_module, tmp_path): + f = tmp_path / "a.jsonl" + app_module._append_jsonl(str(f), {"x": 1}) + app_module._append_jsonl(str(f), {"x": 2}) + entries = app_module._read_jsonl(str(f)) + assert isinstance(entries, list) + assert len(entries) == 2 + assert entries[0]["x"] == 1 + + +def test_classify_failure_various_messages(app_module): + assert app_module._classify_failure(2, error_message="syntax error: foo") == "shell_error" + assert app_module._classify_failure(127, output="command not found") == "dependency_error" + assert app_module._classify_failure(0, output="weird") == "unknown_failure" + + +def test_safe_load_json_and_isolate(app_module, tmp_path): + f = tmp_path / "bad.json" + f.write_text("notjson") + with patch.object(app_module, "WORKSPACE_DIR", str(tmp_path)): + res = app_module._safe_load_json(str(f), default={}) + assert res == {} + corrupted = str(f) + ".corrupted" + assert (not os.path.exists(str(f))) or os.path.exists(corrupted) + + +def test_session_record_from_file_and_diagnostics(app_module, tmp_path): + sess = tmp_path / "s.json" + data = {"metadata": {"id": "i1", "display_name": "d1", "finished_at": "2026-06-03T00:00:00Z", "success": True}} + sess.write_text(json.dumps(data)) + with patch.object(app_module, "SESSION_LOG_DIR", str(tmp_path)): + record = app_module._session_record_from_file(str(sess.name)) + assert record is not None + diag = app_module._diagnose_session_data(data) + assert isinstance(diag, dict) + + +def test_save_and_load_command_history(app_module, tmp_path): + cmd_file = tmp_path / "commands.json" + with patch.object(app_module, "COMMAND_HISTORY_FILE", str(cmd_file)): + app_module.save_command_history(" ") + app_module.save_command_history("one") + app_module.save_command_history("two") + app_module.save_command_history("one") + h = app_module.load_command_history() + assert h[0] == "one" or h[0] == "two" + + +def test_isolate_corrupted_file_no_exist(app_module, tmp_path): + f = tmp_path / "nope.json" + app_module._isolate_corrupted_file(str(f)) diff --git a/tests/test_execution_lifecycle.py b/tests/test_execution_lifecycle.py new file mode 100644 index 0000000..db336f2 --- /dev/null +++ b/tests/test_execution_lifecycle.py @@ -0,0 +1,107 @@ +import os +import time +import subprocess +import json +from unittest.mock import Mock, patch + + +def test_cleanup_execution_when_execution_is_none_removes_temp_and_terminates(app_module, tmp_path): + temp = tmp_path / "tmp_run.sh" + temp.write_text("echo hi") + + class FakeProc: + def __init__(self): + self.killed = False + + def poll(self): + return None + + proc = FakeProc() + + called = {} + + def fake_terminate(p): + called['terminated'] = True + + with patch.object(app_module, "_terminate_process_tree", fake_terminate): + app_module._cleanup_execution(proc, None, run_id="r1", temp_path=str(temp)) + + assert called.get('terminated') is True + assert not os.path.exists(str(temp)) + + +def test_cleanup_execution_full_flow_calls_finalize_and_closes_handle(app_module, tmp_path): + # prepare fake proc with streams + class FakeStream: + def close(self): + self.closed = True + + class FakeProc: + def __init__(self): + self.pid = 9999 + self.stdout = FakeStream() + self.stderr = FakeStream() + self.returncode = 1 + + def poll(self): + return None + + proc = FakeProc() + + # create an open file handle for execution['handle'] + logf = tmp_path / "log.txt" + h = open(logf, "w", encoding="utf-8") + + execution = { + "record": {"status": "running", "id": "x"}, + "handle": h, + "monotonic_start": time.perf_counter() - 0.5, + } + + # put an active process to be removed + with app_module.active_processes_lock: + app_module.active_processes["r2"] = {"process": proc} + + called = {} + + def fake_finalize(execution_obj, success, exit_code, duration_seconds, **kwargs): + called['finalized'] = True + return {"id": "x"} + + reader = Mock() + reader.join.side_effect = None + + with patch.object(app_module, "_finalize_execution", fake_finalize): + app_module._cleanup_execution(proc, execution, run_id="r2", temp_path=None, was_aborted=False, error_message="err", exit_code=2, stop_event=None, reader_thread=reader) + + assert called.get('finalized') is True + assert h.closed + with app_module.active_processes_lock: + assert "r2" not in app_module.active_processes + + +def test_exec_command_timeout_triggers_cleanup_and_yields_error(app_module, client, tmp_path): + # Patch check_lock to allow execution + with patch.object(app_module, "check_lock", lambda *a, **k: True), patch.object(app_module, "_find_shell", lambda: "sh"): + # fake Popen that has stdout.readline returning empty string so reader ends, and wait raises TimeoutExpired + class FakePopen: + def __init__(self, *a, **k): + class Out: + def readline(self): + return "" + + self.stdout = Out() + self.stderr = None + self.returncode = None + self.pid = 12345 + + def poll(self): + return None + + def wait(self, timeout=None): + raise subprocess.TimeoutExpired(cmd="x", timeout=timeout) + + with patch('app.subprocess.Popen', FakePopen): + resp = client.post('/api/exec', json={"command": "echo hi", "password": ""}) + data = b"".join(resp.response).decode('utf-8') + assert "Execution timed out" in data or "❌ Execution timed out" in data diff --git a/tests/test_reliability.py b/tests/test_reliability.py index e1ce75f..5373860 100644 --- a/tests/test_reliability.py +++ b/tests/test_reliability.py @@ -4,6 +4,7 @@ import subprocess import pytest import psutil +from unittest.mock import patch SENTINEL = object() @@ -323,3 +324,259 @@ def test_no_zombie_processes(app_module): ) except (psutil.NoSuchProcess, psutil.AccessDenied): pass + +def test_cleanup_old_execution_logs_prunes_old_logs(app_module, tmp_path): + old_dir = tmp_path / "exec_logs" + old_dir.mkdir() + + # create old and new files + old_file = old_dir / "old.log" + new_file = old_dir / "new.log" + old_file.write_text("old") + new_file.write_text("new") + + # set old mtime to 60 days ago + old_mtime = time.time() - (60 * 24 * 60 * 60) + os.utime(old_file, (old_mtime, old_mtime)) + + # patch EXECUTION_LOG_DIR + with patch.object(app_module, "EXECUTION_LOG_DIR", str(old_dir)): + app_module._cleanup_old_execution_logs() + + assert not old_file.exists() + assert new_file.exists() + + +def test_start_append_and_finalize_execution_record(app_module, tmp_path): + # isolate log/session/history files + exec_dir = tmp_path / "exec" + sess_dir = tmp_path / "sessions" + rel_dir = tmp_path / "reliability" + exec_dir.mkdir() + sess_dir.mkdir() + rel_dir.mkdir() + + history_file = tmp_path / "history.jsonl" + failed_file = tmp_path / "failed.jsonl" + + with patch.object(app_module, "EXECUTION_LOG_DIR", str(exec_dir)), \ + patch.object(app_module, "SESSION_LOG_DIR", str(sess_dir)), \ + patch.object(app_module, "RELIABILITY_DIR", str(rel_dir)), \ + patch.object(app_module, "HISTORY_FILE", str(history_file)), \ + patch.object(app_module, "FAILED_HISTORY_FILE", str(failed_file)): + + execution = app_module._start_execution_record( + kind="test", display_name="my test", command_text="echo hi" + ) + + # append some lines + app_module._append_execution_line(execution, "stdout", "line1\n") + app_module._append_execution_line(execution, "error", "err happened\n") + + # finalize as success + history = app_module._finalize_execution( + execution, success=True, exit_code=0, duration_seconds=0.123 + ) + + assert history["success"] is True + # session file should exist + session_path = os.path.join(str(sess_dir), history["log_file"].split('_')[-1].replace('.log', '.json')) + sess_files = list(sess_dir.glob('*.json')) + assert len(sess_files) >= 1 + + +def test_save_reliability_summary_creates_backup_and_atomic_replace(app_module, tmp_path): + rel_dir = tmp_path / "reliability" + rel_dir.mkdir() + summary_file = rel_dir / "summary.json" + backup_file = rel_dir / "summary.json.backup" + tmp_file = rel_dir / "summary.json.tmp" + + # initial summary + summary = {"version": app_module.RELIABILITY_SUMMARY_VERSION, "scripts": {}, "global": {}} + + with patch.object(app_module, "RELIABILITY_DIR", str(rel_dir)), patch.object(app_module, "RELIABILITY_SUMMARY_FILE", str(summary_file)), patch.object(app_module, "RELIABILITY_SUMMARY_BACKUP", str(backup_file)), patch.object(app_module, "RELIABILITY_SUMMARY_TMP", str(tmp_file)): + # First save should create file + app_module._save_reliability_summary(summary) + assert summary_file.exists() + + # Modify and save again to exercise backup creation + summary2 = {"version": app_module.RELIABILITY_SUMMARY_VERSION, "scripts": {"a": {}}, "global": {}} + app_module._save_reliability_summary(summary2) + # backup may or may not exist depending on save path, but tmp should not remain + assert not tmp_file.exists() + + +def test_record_reliability_event_appends_and_trims(app_module, tmp_path): + rel_dir = tmp_path / "reliability" + rel_dir.mkdir() + events_file = rel_dir / "events.jsonl" + + with patch.object(app_module, "RELIABILITY_DIR", str(rel_dir)), patch.object(app_module, "RELIABILITY_EVENTS_FILE", str(events_file)): + # append a few events + for i in range(5): + app_module._record_reliability_event({"id": f"e{i}", "info": i}, persist_force=False) + + lines = app_module._read_jsonl(str(events_file)) + assert len(lines) == 5 + + # Simulate trim by lowering MAX_RELIABILITY_EVENTS and adding more + with patch.object(app_module, "MAX_RELIABILITY_EVENTS", 3): + for i in range(3): + app_module._record_reliability_event({"id": f"x{i}", "info": i}, persist_force=False) + + lines2 = app_module._read_jsonl(str(events_file)) + assert len(lines2) <= 3 + + +def test_load_workspace_state_handles_corrupted_file(app_module, tmp_path): + ws_file = tmp_path / "workspace_state.json" + # write invalid json + ws_file.write_text('{ invalid json }') + + with patch.object(app_module, "WORKSPACE_STATE_FILE", str(ws_file)): + result = app_module.load_workspace_state() + + assert isinstance(result, dict) + assert result.get("corrupted") is True + corrupted_path = str(ws_file) + ".corrupted" + assert os.path.exists(corrupted_path) + assert not os.path.exists(str(ws_file)) + + +def test_save_favorites_raises_on_ioerror(app_module, tmp_path): + fav_file = tmp_path / "favorites.json" + with patch.object(app_module, "FAVORITES_FILE", str(fav_file)): + with patch("builtins.open", side_effect=OSError("disk full")): + with pytest.raises(OSError): + app_module.save_favorites(["one"]) + + +def test_save_sessions_function_raises_on_ioerror(app_module, tmp_path): + sess_file = tmp_path / "sessions.json" + with patch.object(app_module, "SESSIONS_FILE", str(sess_file)): + with patch("builtins.open", side_effect=OSError("write error")): + with pytest.raises(OSError): + app_module.save_sessions({}) + + +def test_save_session_route_returns_500_on_save_error(client, app_module, tmp_path): + with patch.object(app_module, "save_sessions", side_effect=Exception("boom")): + r = client.post("/api/sessions/save", json={"session": {"x": 1}}) + assert r.status_code == 500 + body = r.get_json() + assert body["success"] is False and "error" in body + + +def test_classify_failure_various_cases(): + from app import _classify_failure + + assert _classify_failure(130, "interrupted") == "interrupted" + assert _classify_failure(124, "timeout") == "timeout" + assert _classify_failure(126, "permission denied") == "permission_error" + assert _classify_failure(127, "command not found") == "dependency_error" + assert _classify_failure(2, "some error") == "shell_error" + assert _classify_failure(None, "unknown stuff") == "unknown_failure" + + +def test_finalize_execution_writes_history_and_failed(app_module, tmp_path): + # prepare environment + hist = tmp_path / "history.jsonl" + failed = tmp_path / "failed.jsonl" + exec_dir = tmp_path / "exec" + sess_dir = tmp_path / "sessions" + exec_dir.mkdir() + sess_dir.mkdir() + + with patch.object(app_module, "HISTORY_FILE", str(hist)), patch.object(app_module, "FAILED_HISTORY_FILE", str(failed)), patch.object(app_module, "EXECUTION_LOG_DIR", str(exec_dir)), patch.object(app_module, "SESSION_LOG_DIR", str(sess_dir)): + # create a dummy execution + log_file = exec_dir / "e.log" + handle = open(log_file, "w", encoding="utf-8") + execution = { + "record": { + "id": "x1", + "kind": "script", + "session_file": "s1.json", + "display_name": "d", + "command": "c", + "shell": "sh", + "cwd": ".", + "arguments": [], + "started_at": app_module._iso_now(), + "log_file": str(log_file.name), + }, + "excerpt_lines": ["line1", "line2"], + "excerpt_size": 10, + "session_data": {"metadata": {}, "events": []}, + "handle": handle, + "monotonic_start": 0, + } + + # call finalize as failed + hist_rec = app_module._finalize_execution(execution, success=False, exit_code=2, duration_seconds=0.5, error_message="fail") + assert hist_rec["success"] is False + h = app_module._read_jsonl(str(hist)) + f = app_module._read_jsonl(str(failed)) + assert any(r.get("id") == "x1" for r in h) + assert any(r.get("id") == "x1" for r in f) + + +def test_cleanup_execution_handles_handle_close_exceptions(app_module, tmp_path): + # simulate a handle that raises on flush/close + class BadHandle: + closed = False + + def write(self, *a, **k): + pass + + def flush(self): + raise OSError("flush fail") + + def close(self): + raise OSError("close fail") + + execution = {"handle": BadHandle(), "record": {"session_file": "s.json"}, "monotonic_start": 0} + app_module._cleanup_execution(None, execution, run_id="r1", reader_thread=None) + + +def test_run_script_reader_thread_exception_triggers_cleanup(client, app_module, tmp_path): + pass + + +def test_run_script_timeoutexpired_branch_yields_timeout_error(client, app_module, tmp_path): + scripts_dir = tmp_path / "scripts2" + scripts_dir.mkdir() + script_path = scripts_dir / 'tmp_timeout.sh' + script_path.write_text('#!/bin/sh\necho hi\n') + + class EmptyStdout: + def readline(self): + return "" + + def close(self): + pass + + class FakePopen2: + pid = None + def __init__(self): + self.stdout = EmptyStdout() + self.returncode = 0 + self.pid = None + + def poll(self): + return 0 + + def wait(self, timeout=None): + raise subprocess.TimeoutExpired(cmd='fake', timeout=timeout) + + with patch.object(app_module, "SCRIPTS_DIR", str(scripts_dir)), patch("subprocess.Popen", return_value=FakePopen2()): + resp = client.post('/api/scripts/run', json={'path': 'tmp_timeout.sh'}) + assert resp.status_code == 200 + found_timeout = False + for chunk in resp.iter_encoded(): + text = chunk.decode('utf-8') + if 'execution timed out' in text.lower() or 'timed out' in text.lower(): + found_timeout = True + break + + assert found_timeout diff --git a/tests/test_security.py b/tests/test_security.py index 43d0bcc..6461af3 100644 --- a/tests/test_security.py +++ b/tests/test_security.py @@ -151,3 +151,176 @@ def test_migration_save_safety(app_module, tmp_path): # check_lock should still verify password and return True assert app_module.check_lock(rel_path, password) is True +def test_validate_safe_path_rejects_traversal(app_module, tmp_path): + base = tmp_path / "base" + base.mkdir() + + # traversal should be rejected + with pytest.raises(ValueError): + app_module.validate_safe_path(str(base), "../outside.txt") + + +def test_validate_safe_path_allows_normal_paths(app_module, tmp_path): + base = tmp_path / "base2" + base.mkdir() + (base / "ok").mkdir() + + resolved = app_module.validate_safe_path(str(base), "ok/file.txt") + assert str(resolved).startswith(str(base)) + + +def test_import_github_rejects_non_github_urls(client): + resp = client.post("/api/scripts/import_github", json={ + "url": "http://example.com/script.sh", + "category": "cat", + "filename": "script.sh", + }) + assert resp.status_code == 400 + + +def test_import_github_blocks_non_http_schemes(client): + resp = client.post("/api/scripts/import_github", json={ + "url": "ftp://raw.githubusercontent.com/owner/repo/branch/file.sh", + "category": "cat", + "filename": "script.sh", + }) + assert resp.status_code == 400 + + +def test_import_github_rejects_large_and_binary_payloads(app_module, client): + # Prepare a large payload (>500KB) + large = b"A" * 600_000 + + class DummyResp: + def __init__(self, data): + self._data = data + + def read(self): + return self._data + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + class FakeOpener: + def open(self, req, timeout=10): + return DummyResp(large) + + with patch.object(app_module.urllib.request, "build_opener", return_value=FakeOpener()): + resp = client.post("/api/scripts/import_github", json={ + "url": "https://raw.githubusercontent.com/owner/repo/branch/file.sh", + "category": "cat", + "filename": "script.sh", + }) + assert resp.status_code == 400 + + +def test_import_github_handles_non_utf8(app_module, client): + bad = b"\xff\xff\xff" + + class DummyResp2: + def __init__(self, data): + self._data = data + + def read(self): + return self._data + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + class FakeOpener2: + def open(self, req, timeout=10): + return DummyResp2(bad) + + with patch.object(app_module.urllib.request, "build_opener", return_value=FakeOpener2()): + resp = client.post("/api/scripts/import_github", json={ + "url": "https://raw.githubusercontent.com/owner/repo/branch/file.sh", + "category": "cat", + "filename": "script.sh", + }) + assert resp.status_code == 400 + + +def test_save_workspace_state_validation_and_failure(app_module, tmp_path): + # invalid payload + ok, err = app_module.save_workspace_state("not a dict") + assert ok is False + + valid = {"terminals": []} + with patch("builtins.open", side_effect=Exception("disk full")): + ok2, err2 = app_module.save_workspace_state(valid) + assert ok2 is False + + +def test_save_locks_and_favorites_persist(app_module, tmp_path): + locks_file = tmp_path / "locks.json" + favs_file = tmp_path / "favs.json" + + with patch.object(app_module, "LOCKS_FILE", str(locks_file)), patch.object(app_module, "FAVORITES_FILE", str(favs_file)): + app_module.save_locks({"p": "x"}) + assert app_module.load_locks() == {"p": "x"} + + app_module.save_favorites(["a", "b"]) + assert app_module.load_favorites() == ["a", "b"] + + +def test_save_script_path_validation_and_locked(app_module, client): + # Force check_lock to fail to exercise locked response + with patch.object(app_module, "check_lock", return_value=False): + resp = client.post("/api/scripts/save", json={ + "category": "cat", + "filename": "evil.sh", + "content": "echo hi", + "password": "", + }) + assert resp.status_code == 401 + + +def test_delete_script_removes_favorites_and_locks(app_module, client, tmp_path): + # Setup tmp scripts dir and files + scripts_dir = tmp_path / "scripts" + scripts_dir.mkdir() + rel = "cat/test.sh" + full = scripts_dir / "cat" + full.mkdir() + (full / "test.sh").write_text("echo hi") + + locks_file = tmp_path / "locks.json" + favs_file = tmp_path / "favs.json" + app_module.save_locks({rel: app_module.generate_password_hash("p")}) + app_module.save_favorites([rel]) + + with patch.object(app_module, "SCRIPTS_DIR", str(scripts_dir)), patch.object(app_module, "LOCKS_FILE", str(locks_file)), patch.object(app_module, "FAVORITES_FILE", str(favs_file)): + # ensure check_lock allows deletion + with patch.object(app_module, "check_lock", return_value=True): + resp = client.delete("/api/scripts/delete", json={"path": rel}) + assert resp.status_code == 200 + assert not (scripts_dir / rel).exists() + + +def test_manage_lock_set_and_remove(app_module, client, tmp_path): + locks_file = tmp_path / "locks.json" + with patch.object(app_module, "LOCKS_FILE", str(locks_file)): + # set lock + resp = client.post("/api/scripts/lock", json={"path": "c/s.sh", "old_password": "", "new_password": "secret"}) + assert resp.status_code == 200 + data = resp.get_json() + assert data["success"] is True + + # remove lock + resp2 = client.post("/api/scripts/lock", json={"path": "c/s.sh", "old_password": "secret", "new_password": ""}) + assert resp2.status_code == 200 + data2 = resp2.get_json() + assert data2["success"] is True + + +def test_exec_command_requires_terminal_unlock(app_module, client): + with patch.object(app_module, "check_lock", return_value=False): + resp = client.post("/api/exec", json={"command": "echo 1", "password": ""}) + assert resp.status_code == 401 + diff --git a/tests/test_sse.py b/tests/test_sse.py index b367d5a..b44ecb4 100644 --- a/tests/test_sse.py +++ b/tests/test_sse.py @@ -1,5 +1,8 @@ import json import pytest +from unittest.mock import patch + +import os def test_sse_run_and_kill(client): # Trigger run on test/long.sh @@ -53,3 +56,49 @@ def test_sse_run_and_kill(client): break assert aborted_found + + +def test_run_script_not_found_returns_404(client): + r = client.post('/api/scripts/run', json={'path': 'no/such.sh'}) + assert r.status_code == 404 + body = r.get_json() + assert 'error' in body + + +def test_run_script_popen_error_stream_returns_error_and_cleans(client, app_module, tmp_path): + # create a small script in the scripts dir + scripts_dir = app_module.SCRIPTS_DIR + os.makedirs(scripts_dir, exist_ok=True) + script_path = os.path.join(scripts_dir, 'tmp_test.sh') + with open(script_path, 'w', encoding='utf-8') as f: + f.write('#!/bin/sh\necho hello\n') + + # make Popen raise to trigger exception path and cleanup + with patch('subprocess.Popen', side_effect=OSError('spawn failed')): + resp = client.post('/api/scripts/run', json={'path': 'tmp_test.sh'}) + assert resp.status_code == 200 + # iterate stream to find error message + found_error = False + for chunk in resp.iter_encoded(): + text = chunk.decode('utf-8') + for line in text.split('\n'): + if line.startswith('data: '): + try: + data = json.loads(line[6:]) + if data.get('type') == 'error': + found_error = True + break + except Exception: + pass + if found_error: + break + + assert found_error + assert not app_module.active_processes + + +def test_kill_missing_and_unknown_run_id(client): + r = client.post('/api/scripts/kill', json={}) + assert r.status_code == 400 + r2 = client.post('/api/scripts/kill', json={'run_id': 'nope'}) + assert r2.status_code == 404 diff --git a/tests/test_validators_and_jsonl.py b/tests/test_validators_and_jsonl.py new file mode 100644 index 0000000..3b652f5 --- /dev/null +++ b/tests/test_validators_and_jsonl.py @@ -0,0 +1,69 @@ +import os +import json +import pytest + +from utils import validators + + +def test_validate_safe_path_ok(tmp_path): + base = tmp_path / "base" + base.mkdir() + child = base / "subdir" + child.mkdir() + p = validators.validate_safe_path(str(base), "subdir/file.txt") + assert str(base) in str(p) + + +def test_validate_safe_path_empty(): + with pytest.raises(ValueError): + validators.validate_safe_path("/tmp", "") + + +def test_validate_safe_path_traversal(tmp_path): + base = tmp_path / "base" + base.mkdir() + with pytest.raises(ValueError): + validators.validate_safe_path(str(base), "../etc/passwd") + + +def test_validate_git_branch_valid(): + assert validators.validate_git_branch("feature/x") == "feature/x" + + +@pytest.mark.parametrize("bad", ["", "-bad", "bad..name", "bad/", "name.lock", "bad*name"]) +def test_validate_git_branch_invalid(bad): + with pytest.raises(ValueError): + validators.validate_git_branch(bad) + + +def test_validate_repo_name_valid(): + assert validators.validate_repo_name("git@github.com:owner/repo.git") + + +@pytest.mark.parametrize("bad", ["", "-bad", "bad name", "bad$name"]) +def test_validate_repo_name_invalid(bad): + with pytest.raises(ValueError): + validators.validate_repo_name(bad) + + +def test_append_and_read_jsonl(app_module, tmp_path): + f = tmp_path / "logs.jsonl" + records = [{"a": 1}, {"b": 2}] + for r in records: + app_module._append_jsonl(str(f), r) + + got = app_module._read_jsonl(str(f)) + assert len(got) == 2 + assert got[0]["a"] == 1 + + +def test_read_jsonl_ignores_invalid_lines(app_module, tmp_path): + f = tmp_path / "mix.jsonl" + with open(f, "w", encoding="utf-8") as fh: + fh.write("{\"ok\": 1}\n") + fh.write("not a json\n") + fh.write("{\"ok\": 2}\n") + + got = app_module._read_jsonl(str(f)) + assert len(got) == 2 + assert got[1]["ok"] == 2