diff --git a/queue_core.py b/queue_core.py index ab1c5e9..a846e18 100644 --- a/queue_core.py +++ b/queue_core.py @@ -52,6 +52,7 @@ def from_data_dir(cls, data_dir: Path) -> "QueuePaths": pid INTEGER, server_id TEXT, child_pid INTEGER, + command TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) @@ -62,6 +63,11 @@ def from_data_dir(cls, data_dir: Path) -> "QueuePaths": ALTER TABLE queue ADD COLUMN server_id TEXT """ +# Migration to add command column to existing databases +QUEUE_MIGRATION_COMMAND = """ +ALTER TABLE queue ADD COLUMN command TEXT +""" + QUEUE_INDEX = """ CREATE INDEX IF NOT EXISTS idx_queue_status ON queue(queue_name, status) """ @@ -91,11 +97,12 @@ def init_db(paths: QueuePaths): with get_db(paths.db_path) as conn: conn.execute(QUEUE_SCHEMA) conn.execute(QUEUE_INDEX) - # Run migration for existing databases without server_id column - try: - conn.execute(QUEUE_MIGRATION_SERVER_ID) - except sqlite3.OperationalError: - pass # Column already exists + # Run migrations for existing databases + for migration in [QUEUE_MIGRATION_SERVER_ID, QUEUE_MIGRATION_COMMAND]: + try: + conn.execute(migration) + except sqlite3.OperationalError: + pass # Column already exists def ensure_db(paths: QueuePaths): diff --git a/tests/test_tq_cli.py b/tests/test_tq_cli.py index 7733c40..15c0290 100644 --- a/tests/test_tq_cli.py +++ b/tests/test_tq_cli.py @@ -4,9 +4,12 @@ """ import json +import os +import signal import subprocess import sys import tempfile +import time from pathlib import Path import pytest @@ -153,6 +156,176 @@ def test_list_no_database(self, temp_data_dir): assert result.returncode == 0 assert "No queue database" in result.stdout or "empty" in result.stdout.lower() + def test_list_json_empty_queue(self, temp_data_dir): + """Test list --json command with DB exists but queue is empty.""" + # Initialize DB by running a task that completes + run_tq("echo", "init", data_dir=temp_data_dir) + + result = run_tq("list", "--json", data_dir=temp_data_dir) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert output == { + "tasks": [], + "summary": {"total": 0, "running": 0, "waiting": 0} + } + + def test_list_json_no_database(self, temp_data_dir): + """Test list --json command when database doesn't exist.""" + result = run_tq("list", "--json", data_dir=temp_data_dir) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert output["tasks"] == [] + assert output["summary"]["total"] == 0 + + +class TestJsonSchemaContracts: + """ + Schema contract tests for JSON output. + + These tests ensure the JSON structure remains stable for programmatic consumers + (e.g., Claude Code status lines). Any changes to these schemas should be + intentional and backward-compatible. + """ + + # Expected fields for each schema - used to enforce contracts + LIST_REQUIRED_KEYS = {"tasks", "summary"} + LIST_SUMMARY_REQUIRED_KEYS = {"total", "running", "waiting"} + LIST_TASK_REQUIRED_KEYS = {"id", "queue_name", "status", "command", "pid", "child_pid", "created_at", "updated_at"} + + LOGS_REQUIRED_KEYS = {"entries"} + LOGS_ENTRY_REQUIRED_KEYS = {"event", "timestamp"} # Base keys all entries must have + + CLEAR_REQUIRED_KEYS = {"cleared", "success"} + + def test_list_json_schema_empty(self, temp_data_dir): + """Verify list --json schema structure when empty.""" + result = run_tq("list", "--json", data_dir=temp_data_dir) + output = json.loads(result.stdout) + + # Top-level keys + assert set(output.keys()) == self.LIST_REQUIRED_KEYS, \ + f"list --json must have exactly keys {self.LIST_REQUIRED_KEYS}" + + # Summary keys + assert set(output["summary"].keys()) == self.LIST_SUMMARY_REQUIRED_KEYS, \ + f"list --json summary must have exactly keys {self.LIST_SUMMARY_REQUIRED_KEYS}" + + # Type checks + assert isinstance(output["tasks"], list) + assert isinstance(output["summary"]["total"], int) + assert isinstance(output["summary"]["running"], int) + assert isinstance(output["summary"]["waiting"], int) + + def test_list_json_schema_with_running_task(self, temp_data_dir): + """Verify list --json task schema with an active task.""" + # Start a long-running task + proc = subprocess.Popen( + [sys.executable, str(TQ_PATH), f"--data-dir={temp_data_dir}", "sleep", "30"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + start_new_session=True, + ) + + try: + # Wait for it to start + time.sleep(0.5) + + result = run_tq("list", "--json", data_dir=temp_data_dir) + output = json.loads(result.stdout) + + # Verify structure + assert set(output.keys()) == self.LIST_REQUIRED_KEYS + assert len(output["tasks"]) >= 1 + + # Verify task object schema + task = output["tasks"][0] + assert set(task.keys()) == self.LIST_TASK_REQUIRED_KEYS, \ + f"Task object must have exactly keys {self.LIST_TASK_REQUIRED_KEYS}, got {set(task.keys())}" + + # Verify task field types + assert isinstance(task["id"], int) + assert isinstance(task["queue_name"], str) + assert task["status"] in ("running", "waiting") + assert task["command"] is None or isinstance(task["command"], str) + assert task["pid"] is None or isinstance(task["pid"], int) + assert task["child_pid"] is None or isinstance(task["child_pid"], int) + assert task["created_at"] is None or isinstance(task["created_at"], str) + assert task["updated_at"] is None or isinstance(task["updated_at"], str) + + # Verify command is populated for the running task + assert task["command"] == "sleep 30", f"Expected command 'sleep 30', got {task['command']}" + + # Verify summary counts are accurate + assert output["summary"]["total"] == len(output["tasks"]) + running_count = sum(1 for t in output["tasks"] if t["status"] == "running") + waiting_count = sum(1 for t in output["tasks"] if t["status"] == "waiting") + assert output["summary"]["running"] == running_count + assert output["summary"]["waiting"] == waiting_count + + finally: + # Clean up + try: + os.killpg(os.getpgid(proc.pid), signal.SIGTERM) + except Exception: + proc.terminate() + proc.wait(timeout=5) + + def test_logs_json_schema_empty(self, temp_data_dir): + """Verify logs --json schema structure when empty.""" + result = run_tq("logs", "--json", data_dir=temp_data_dir) + output = json.loads(result.stdout) + + assert set(output.keys()) == self.LOGS_REQUIRED_KEYS, \ + f"logs --json must have exactly keys {self.LOGS_REQUIRED_KEYS}" + assert isinstance(output["entries"], list) + + def test_logs_json_schema_with_entries(self, temp_data_dir): + """Verify logs --json entry schema with actual log entries.""" + # Generate some logs + run_tq("echo", "test", data_dir=temp_data_dir) + + result = run_tq("logs", "--json", data_dir=temp_data_dir) + output = json.loads(result.stdout) + + assert set(output.keys()) == self.LOGS_REQUIRED_KEYS + assert len(output["entries"]) >= 3 # queued, started, completed + + # Verify each entry has required base keys + for entry in output["entries"]: + assert self.LOGS_ENTRY_REQUIRED_KEYS.issubset(set(entry.keys())), \ + f"Log entry must have at least keys {self.LOGS_ENTRY_REQUIRED_KEYS}, got {set(entry.keys())}" + assert isinstance(entry["event"], str) + assert isinstance(entry["timestamp"], str) + + # Verify specific event schemas + for entry in output["entries"]: + if entry["event"] == "task_queued": + assert "task_id" in entry + assert "queue_name" in entry + elif entry["event"] == "task_started": + assert "task_id" in entry + assert "queue_name" in entry + assert "wait_time_seconds" in entry + elif entry["event"] == "task_completed": + assert "task_id" in entry + assert "queue_name" in entry + assert "exit_code" in entry + assert "duration_seconds" in entry + + def test_clear_json_schema(self, temp_data_dir): + """Verify clear --json schema structure.""" + result = run_tq("clear", "--json", data_dir=temp_data_dir) + output = json.loads(result.stdout) + + assert set(output.keys()) == self.CLEAR_REQUIRED_KEYS, \ + f"clear --json must have exactly keys {self.CLEAR_REQUIRED_KEYS}" + assert isinstance(output["cleared"], int) + assert isinstance(output["success"], bool) + assert output["cleared"] >= 0 + assert output["success"] is True + class TestTqLogs: """Tests for the tq logs command.""" @@ -189,6 +362,43 @@ def test_logs_n_option(self, temp_data_dir): lines = [line for line in result.stdout.strip().split("\n") if line] assert len(lines) == 3 + def test_logs_json_no_file(self, temp_data_dir): + """Test logs --json command when no log file exists.""" + result = run_tq("logs", "--json", data_dir=temp_data_dir) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert output == {"entries": []} + + def test_logs_json_shows_activity(self, temp_data_dir): + """Test logs --json command shows task activity.""" + # Run a task first to generate logs + run_tq("echo", "test", data_dir=temp_data_dir) + + result = run_tq("logs", "--json", data_dir=temp_data_dir) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert "entries" in output + assert len(output["entries"]) >= 3 # queued, started, completed + + events = [e["event"] for e in output["entries"]] + assert "task_queued" in events + assert "task_started" in events + assert "task_completed" in events + + def test_logs_json_n_option(self, temp_data_dir): + """Test logs --json -n option to limit entries.""" + # Run multiple tasks + for i in range(5): + run_tq("echo", f"test {i}", data_dir=temp_data_dir) + + result = run_tq("logs", "--json", "-n", "3", data_dir=temp_data_dir) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert len(output["entries"]) == 3 + class TestTqClear: """Tests for the tq clear command.""" @@ -203,6 +413,25 @@ def test_clear_empty_queue(self, temp_data_dir): assert result.returncode == 0 assert "already empty" in result.stdout.lower() + def test_clear_json_empty_queue(self, temp_data_dir): + """Test clear --json command with empty queue.""" + # Initialize database by running a task that completes + run_tq("echo", "init", data_dir=temp_data_dir) + + result = run_tq("clear", "--json", data_dir=temp_data_dir, timeout=5) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert output == {"cleared": 0, "success": True} + + def test_clear_json_no_database(self, temp_data_dir): + """Test clear --json command when no database exists.""" + result = run_tq("clear", "--json", data_dir=temp_data_dir, timeout=5) + + assert result.returncode == 0 + output = json.loads(result.stdout) + assert output == {"cleared": 0, "success": True} + class TestTqHelp: """Tests for help output.""" diff --git a/tq.py b/tq.py index 28a95c9..9ab1e50 100644 --- a/tq.py +++ b/tq.py @@ -51,10 +51,14 @@ def get_paths(args) -> QueuePaths: def cmd_list(args): """List all tasks in the queue.""" paths = get_paths(args) + json_output = getattr(args, "json", False) if not paths.db_path.exists(): - print(f"No queue database found at {paths.db_path}") - print("Queue is empty (no tasks have been run yet)") + if json_output: + print(json.dumps({"tasks": [], "summary": {"total": 0, "running": 0, "waiting": 0}})) + else: + print(f"No queue database found at {paths.db_path}") + print("Queue is empty (no tasks have been run yet)") return conn = sqlite3.connect(paths.db_path, timeout=5.0) @@ -65,6 +69,38 @@ def cmd_list(args): "SELECT * FROM queue ORDER BY queue_name, id" ).fetchall() + if json_output: + tasks = [] + running_count = 0 + waiting_count = 0 + for row in rows: + task = { + "id": row["id"], + "queue_name": row["queue_name"], + "status": row["status"], + "command": row["command"] if "command" in row.keys() else None, + "pid": row["pid"], + "child_pid": row["child_pid"], + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + tasks.append(task) + if row["status"] == "running": + running_count += 1 + elif row["status"] == "waiting": + waiting_count += 1 + + output = { + "tasks": tasks, + "summary": { + "total": len(tasks), + "running": running_count, + "waiting": waiting_count, + }, + } + print(json.dumps(output)) + return + if not rows: print("Queue is empty") return @@ -106,9 +142,13 @@ def cmd_list(args): def cmd_clear(args): """Clear all tasks from the queue.""" paths = get_paths(args) + json_output = getattr(args, "json", False) if not paths.db_path.exists(): - print("No queue database found") + if json_output: + print(json.dumps({"cleared": 0, "success": True})) + else: + print("No queue database found") return conn = sqlite3.connect(paths.db_path, timeout=5.0) @@ -116,17 +156,26 @@ def cmd_clear(args): # Check how many tasks exist count = conn.execute("SELECT COUNT(*) FROM queue").fetchone()[0] if count == 0: - print("Queue is already empty") + if json_output: + print(json.dumps({"cleared": 0, "success": True})) + else: + print("Queue is already empty") return - response = input(f"Clear {count} task(s) from queue? [y/N] ") - if response.lower() != 'y': - print("Cancelled") - return + # JSON mode skips confirmation (implies --force) + if not json_output: + response = input(f"Clear {count} task(s) from queue? [y/N] ") + if response.lower() != 'y': + print("Cancelled") + return cursor = conn.execute("DELETE FROM queue") conn.commit() - print(f"Cleared {cursor.rowcount} task(s) from queue") + + if json_output: + print(json.dumps({"cleared": cursor.rowcount, "success": True})) + else: + print(f"Cleared {cursor.rowcount} task(s) from queue") finally: conn.close() @@ -134,14 +183,30 @@ def cmd_clear(args): def cmd_logs(args): """Show recent log entries.""" paths = get_paths(args) + json_output = getattr(args, "json", False) if not paths.metrics_path.exists(): - print(f"No log file found at {paths.metrics_path}") + if json_output: + print(json.dumps({"entries": []})) + else: + print(f"No log file found at {paths.metrics_path}") return lines = paths.metrics_path.read_text().strip().split("\n") recent = lines[-args.n:] if len(lines) > args.n else lines + if json_output: + entries = [] + for line in recent: + try: + entry = json.loads(line) + entries.append(entry) + except json.JSONDecodeError: + # Skip malformed lines in JSON mode + pass + print(json.dumps({"entries": entries})) + return + for line in recent: try: entry = json.loads(line) @@ -214,13 +279,13 @@ def cleanup_queue(conn, queue_name: str, paths: QueuePaths): print(f"[tq] WARNING: Cleared task from old CLI instance (ID: {task['id']}, old_instance: {task['server_id']})") -def register_task(conn, queue_name: str, paths: QueuePaths) -> int: +def register_task(conn, queue_name: str, paths: QueuePaths, command: str = None) -> int: """Register a task in the queue. Returns task_id immediately.""" my_pid = os.getpid() cursor = conn.execute( - "INSERT INTO queue (queue_name, status, pid, server_id) VALUES (?, ?, ?, ?)", - (queue_name, "waiting", my_pid, CLI_INSTANCE_ID), + "INSERT INTO queue (queue_name, status, pid, server_id, command) VALUES (?, ?, ?, ?, ?)", + (queue_name, "waiting", my_pid, CLI_INSTANCE_ID, command), ) conn.commit() task_id = cursor.lastrowid @@ -364,7 +429,7 @@ def cleanup_handler(signum, frame): cleanup_queue(conn, queue_name, paths) # Register task first so task_id is available for cleanup if interrupted - task_id = register_task(conn, queue_name, paths) + task_id = register_task(conn, queue_name, paths, command=command) wait_for_turn(conn, queue_name, task_id, paths) print(f"[tq] Running: {command}") @@ -477,14 +542,17 @@ def main(): run_parser.add_argument("run_command", nargs=argparse.REMAINDER, metavar="COMMAND", help="Command to run") # list - subparsers.add_parser("list", help="List tasks in queue") + list_parser = subparsers.add_parser("list", help="List tasks in queue") + list_parser.add_argument("--json", action="store_true", help="Output in JSON format") # clear - subparsers.add_parser("clear", help="Clear all tasks from queue") + clear_parser = subparsers.add_parser("clear", help="Clear all tasks from queue") + clear_parser.add_argument("--json", action="store_true", help="Output in JSON format and skip confirmation") # logs logs_parser = subparsers.add_parser("logs", help="Show recent log entries") logs_parser.add_argument("-n", type=int, default=20, help="Number of entries (default: 20)") + logs_parser.add_argument("--json", action="store_true", help="Output in JSON format") # Handle implicit run: tq ./gradlew build -> tq run ./gradlew build # Pre-process argv to insert 'run' if needed